Skip to content

Commit 361867f

Browse files
authored
add onConnectionInit handler for listener channelRegistered or handler added event (#15)
* add HttpServer#onConnectionInit(xx) * replace HttpServer#onConnected(Consumer<ChannelHandlerContext> h) to HttpServer#onConnected(Consumer<Channel> h)
1 parent 6af8b70 commit 361867f

File tree

5 files changed

+67
-11
lines changed

5 files changed

+67
-11
lines changed

httpserver/src/main/java/io/esastack/httpserver/HttpServer.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import io.esastack.httpserver.impl.HttpServerImpl;
2020
import io.esastack.httpserver.metrics.Metrics;
2121
import io.netty.channel.Channel;
22-
import io.netty.channel.ChannelHandlerContext;
2322
import io.netty.channel.EventLoopGroup;
2423
import io.netty.util.concurrent.Future;
2524

@@ -78,14 +77,23 @@ static HttpServer create(String name, ServerOptions options) {
7877
*/
7978
HttpServer handle(Consumer<RequestHandle> h);
8079

80+
/**
81+
* Sets the handler for listening connection init.
82+
*
83+
* @param h handler
84+
*
85+
* @return this
86+
*/
87+
HttpServer onConnectionInit(Consumer<Channel> h);
88+
8189
/**
8290
* Sets the handler for listening connection connected.
8391
*
8492
* @param h handler
8593
*
8694
* @return this
8795
*/
88-
HttpServer onConnected(Consumer<ChannelHandlerContext> h);
96+
HttpServer onConnected(Consumer<Channel> h);
8997

9098
/**
9199
* Sets the handler for listening connection disconnected.

httpserver/src/main/java/io/esastack/httpserver/impl/HttpServerChannelInitializr.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,17 +53,20 @@ final class HttpServerChannelInitializr extends ChannelInitializer<Channel> {
5353
private final ServerRuntime runtime;
5454
private final SslHelper sslHelper;
5555
private final Consumer<RequestHandle> handler;
56-
private final Consumer<ChannelHandlerContext> onConnected;
56+
private final Consumer<Channel> onConnectionInit;
57+
private final Consumer<Channel> onConnected;
5758
private final Consumer<Channel> onDisconnected;
5859

5960
HttpServerChannelInitializr(ServerRuntime runtime,
6061
SslHelper sslHelper,
6162
Consumer<RequestHandle> handler,
62-
Consumer<ChannelHandlerContext> onConnected,
63+
Consumer<Channel> onConnectionInit,
64+
Consumer<Channel> onConnected,
6365
Consumer<Channel> onDisconnected) {
6466
this.runtime = runtime;
6567
this.sslHelper = sslHelper;
6668
this.handler = handler;
69+
this.onConnectionInit = onConnectionInit;
6770
this.onConnected = onConnected;
6871
this.onDisconnected = onDisconnected;
6972
}
@@ -76,6 +79,14 @@ protected void initChannel(Channel ch) {
7679
return;
7780
}
7881

82+
if (onConnectionInit != null) {
83+
try {
84+
onConnectionInit.accept(ch);
85+
} catch (Throwable t) {
86+
Loggers.logger().warn("Error while processing onConnectionInit()", t);
87+
}
88+
}
89+
7990
final ChannelPipeline pipeline = ch.pipeline();
8091
// options for each accepted channel
8192
applyChannelOptions(ch);
@@ -91,7 +102,7 @@ protected void initChannel(Channel ch) {
91102
}
92103
if (onConnected != null) {
93104
try {
94-
onConnected.accept(ctx);
105+
onConnected.accept(ch);
95106
} catch (Throwable t) {
96107
Loggers.logger().warn("Error while processing onConnected()", t);
97108
}

httpserver/src/main/java/io/esastack/httpserver/impl/HttpServerImpl.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ public class HttpServerImpl implements HttpServer {
5151

5252
private final ServerRuntime runtime;
5353
private Consumer<RequestHandle> handler;
54-
private Consumer<ChannelHandlerContext> onConnected;
54+
private Consumer<Channel> onConnectionInit;
55+
private Consumer<Channel> onConnected;
5556
private Consumer<Channel> onDisconnected;
5657
private final CloseFuture closeFuture = new CloseFuture();
5758
private final CopyOnWriteArrayList<Runnable> closures = new CopyOnWriteArrayList<>();
@@ -78,7 +79,14 @@ public synchronized HttpServerImpl handle(Consumer<RequestHandle> h) {
7879
}
7980

8081
@Override
81-
public synchronized HttpServerImpl onConnected(Consumer<ChannelHandlerContext> h) {
82+
public HttpServer onConnectionInit(Consumer<Channel> h) {
83+
checkStarted();
84+
this.onConnectionInit = h;
85+
return this;
86+
}
87+
88+
@Override
89+
public synchronized HttpServerImpl onConnected(Consumer<Channel> h) {
8290
checkStarted();
8391
this.onConnected = h;
8492
return this;
@@ -140,7 +148,7 @@ public Future<Void> closeFuture() {
140148
private synchronized HttpServerImpl listen0(SocketAddress address) {
141149
checkStarted();
142150
Checks.checkNotNull(address, "address");
143-
Checks.checkNotNull(handler, "Request handler required");
151+
Checks.checkNotNull(handler, "Request handler required. Set it by HttpServer.handle(xxx)");
144152
final ServerBootstrap bootstrap = new ServerBootstrap();
145153

146154
final Transport transport = Transports.transport(options().isPreferNativeTransport());
@@ -153,6 +161,7 @@ private synchronized HttpServerImpl listen0(SocketAddress address) {
153161
bootstrap.childHandler(new HttpServerChannelInitializr(runtime,
154162
sslHelper,
155163
handler,
164+
onConnectionInit,
156165
onConnected,
157166
onDisconnected));
158167
final EventLoopGroup bossGroup = transport.loop(options().getBossThreads(),

httpserver/src/test/java/io/esastack/httpserver/impl/HttpServerChannelInitializrTest.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ void testCloseChannelIfServerIsShutdown() {
6767
new SslHelper(null, false),
6868
r -> r.response().end(),
6969
null,
70+
null,
7071
null);
7172
final EmbeddedChannel channel = new EmbeddedChannel(initializr);
7273
assertFalse(channel.isActive());
@@ -81,6 +82,7 @@ void testApplyWriteBufferHighWaterMark() {
8182
new SslHelper(null, false),
8283
r -> r.response().end(),
8384
null,
85+
null,
8486
null);
8587
final EmbeddedChannel channel = new EmbeddedChannel(initializr);
8688
assertEquals(WriteBufferWaterMark.DEFAULT.high() + 1,
@@ -96,6 +98,7 @@ void testApplyWriteBufferLowWaterMark() {
9698
new SslHelper(null, false),
9799
r -> r.response().end(),
98100
null,
101+
null,
99102
null);
100103
final EmbeddedChannel channel = new EmbeddedChannel(initializr);
101104
assertEquals(WriteBufferWaterMark.DEFAULT.low() - 1,
@@ -112,6 +115,7 @@ void testApplyWriteBufferWaterMark() {
112115
new SslHelper(null, false),
113116
r -> r.response().end(),
114117
null,
118+
null,
115119
null);
116120
final EmbeddedChannel channel = new EmbeddedChannel(initializr);
117121
assertEquals(WriteBufferWaterMark.DEFAULT.low() - 1,
@@ -129,6 +133,7 @@ void testHAProxyDetectorInitialization() {
129133
new SslHelper(null, false),
130134
r -> r.response().end(),
131135
null,
136+
null,
132137
null);
133138
final EmbeddedChannel channel = new EmbeddedChannel(initializr);
134139
assertNotNull(channel.pipeline().get(HAProxyDetector.class));
@@ -144,6 +149,7 @@ void testHAProxyDecoderInitialization() {
144149
new SslHelper(null, false),
145150
r -> r.response().end(),
146151
null,
152+
null,
147153
null);
148154
final EmbeddedChannel channel = new EmbeddedChannel(initializr);
149155
assertNotNull(channel.pipeline().get(HAProxyMessageDecoder.class));
@@ -162,14 +168,16 @@ void testHAProxyOffInitialization() {
162168
new SslHelper(null, false),
163169
r -> r.response().end(),
164170
null,
171+
null,
165172
null);
166173
final EmbeddedChannel channel = new EmbeddedChannel(initializr);
167174
assertNull(channel.pipeline().get(HAProxyMessageDecoder.class));
168175
assertNull(channel.pipeline().get(HAProxyMessageHandler.class));
169176
}
170177

171178
@Test
172-
void testChannelActiveAndInactiveListener() {
179+
void testChannelInitAndActiveAndInactiveListener() {
180+
final AtomicBoolean init = new AtomicBoolean();
173181
final AtomicBoolean active = new AtomicBoolean();
174182
final AtomicBoolean inActive = new AtomicBoolean();
175183
final ServerRuntime runtime = Helper.serverRuntime(ServerOptionsConfigure.newOpts()
@@ -179,11 +187,13 @@ void testChannelActiveAndInactiveListener() {
179187
new HttpServerChannelInitializr(runtime,
180188
new SslHelper(null, false),
181189
r -> r.response().end(),
182-
ctx -> active.set(true),
190+
c -> init.set(true),
191+
c -> active.set(true),
183192
c -> inActive.set(true));
184193

185194
final EmbeddedChannel channel = new EmbeddedChannel(initializr);
186195

196+
assertTrue(init.get());
187197
assertTrue(active.get());
188198
assertFalse(inActive.get());
189199

@@ -203,6 +213,7 @@ void testChannelActiveAndInactiveListener() {
203213

204214
@Test
205215
void testErrorOccurredInConnectionHandlerShouldBeIgnored() {
216+
final AtomicBoolean init = new AtomicBoolean();
206217
final AtomicBoolean active = new AtomicBoolean();
207218
final AtomicBoolean inActive = new AtomicBoolean();
208219
final ServerRuntime runtime = Helper.serverRuntime(ServerOptionsConfigure.newOpts()
@@ -212,7 +223,11 @@ void testErrorOccurredInConnectionHandlerShouldBeIgnored() {
212223
new HttpServerChannelInitializr(runtime,
213224
new SslHelper(null, false),
214225
r -> r.response().end(),
215-
ctx -> {
226+
c -> {
227+
init.set(true);
228+
ExceptionUtils.throwException(new IllegalStateException());
229+
},
230+
c -> {
216231
active.set(true);
217232
ExceptionUtils.throwException(new IllegalStateException());
218233
},
@@ -223,6 +238,7 @@ void testErrorOccurredInConnectionHandlerShouldBeIgnored() {
223238

224239
final EmbeddedChannel channel = new EmbeddedChannel(initializr);
225240

241+
assertTrue(init.get());
226242
assertTrue(active.get());
227243
assertFalse(inActive.get());
228244
channel.close();
@@ -426,6 +442,7 @@ private static void testUnsupportedUpgradeProtocol(boolean logging,
426442
new SslHelper(null, false),
427443
r -> r.response().end(),
428444
null,
445+
null,
429446
null);
430447

431448
final EmbeddedChannel channel = new EmbeddedChannel(initializr);
@@ -502,6 +519,7 @@ private static void testUpgradeError(boolean logging,
502519
});
503520
},
504521
null,
522+
null,
505523
null);
506524

507525
final EmbeddedChannel channel = new EmbeddedChannel(initializr);
@@ -573,6 +591,7 @@ private static void testUpgrade(boolean logging,
573591
new SslHelper(null, false),
574592
r -> r.response().end(),
575593
null,
594+
null,
576595
null);
577596

578597
final EmbeddedChannel channel = new EmbeddedChannel(initializr);
@@ -637,6 +656,7 @@ private static void testHttp1HandlerInitialization(boolean logging,
637656
new SslHelper(null, false),
638657
r -> r.response().end(),
639658
null,
659+
null,
640660
null);
641661

642662
final EmbeddedChannel channel = new EmbeddedChannel(initializr);
@@ -681,6 +701,7 @@ private static void testSslDetectFailedWithHttp2Disabled(boolean logging,
681701
new SslHelper(runtime.options().getSsl(), false),
682702
r -> r.response().end(),
683703
null,
704+
null,
684705
null);
685706

686707
final EmbeddedChannel channel = new EmbeddedChannel(initializr);
@@ -737,6 +758,7 @@ private static void testSslDetectFailedWithHttp2Enabled(boolean logging,
737758
new SslHelper(runtime.options().getSsl(), false),
738759
r -> r.response().end(),
739760
null,
761+
null,
740762
null);
741763

742764
final EmbeddedChannel channel = new EmbeddedChannel(initializr);
@@ -805,6 +827,7 @@ private static void testH1HandlerInitializationAfterH2cDetector(boolean logging,
805827
new SslHelper(null, false),
806828
r -> r.response().end(),
807829
null,
830+
null,
808831
null);
809832

810833
final EmbeddedChannel channel = new EmbeddedChannel(initializr);
@@ -869,6 +892,7 @@ private static void testH2cHandlerInitializationAfterH2cDetector(boolean logging
869892
new SslHelper(null, false),
870893
r -> r.response().end(),
871894
null,
895+
null,
872896
null);
873897

874898
final EmbeddedChannel channel = new EmbeddedChannel(initializr);

httpserver/src/test/java/io/esastack/httpserver/impl/HttpServerImplTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,13 @@ void testStatus() throws InterruptedException {
5151

5252

5353
final AtomicBoolean onClose = new AtomicBoolean();
54+
final AtomicBoolean onConnectionInit = new AtomicBoolean();
5455
final AtomicBoolean onConnected = new AtomicBoolean();
5556
final AtomicBoolean onDisconnected = new AtomicBoolean();
5657
final AtomicBoolean onHandle = new AtomicBoolean();
5758
final CompletableFuture<Boolean> closed = new CompletableFuture<>();
5859
assertDoesNotThrow(() -> server.onClose(() -> onClose.set(true)));
60+
assertDoesNotThrow(() -> server.onConnectionInit(ctx -> onConnectionInit.set(true)));
5961
assertDoesNotThrow(() -> server.onConnected(ctx -> onConnected.set(true)));
6062
assertDoesNotThrow(() -> server.onDisconnected(ctx -> onDisconnected.set(true)));
6163
assertDoesNotThrow(() -> server.handle(ctx -> onHandle.set(true)));
@@ -100,10 +102,12 @@ void testStatus() throws InterruptedException {
100102
assertNotNull(server.bossGroup());
101103
assertNotNull(server.ioGroup());
102104

105+
assertThrows(IllegalStateException.class, () -> server.onConnectionInit(ctx -> onConnected.set(true)));
103106
assertThrows(IllegalStateException.class, () -> server.onConnected(ctx -> onConnected.set(true)));
104107
assertThrows(IllegalStateException.class, () -> server.onDisconnected(ctx -> onDisconnected.set(true)));
105108
assertThrows(IllegalStateException.class, () -> server.handle(ctx -> onHandle.set(true)));
106109

110+
assertFalse(onConnectionInit.get());
107111
assertFalse(onConnected.get());
108112
assertFalse(onDisconnected.get());
109113
assertFalse(onHandle.get());

0 commit comments

Comments
 (0)