Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions httpserver/src/main/java/io/esastack/httpserver/HttpServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.esastack.httpserver.impl.HttpServerImpl;
import io.esastack.httpserver.metrics.Metrics;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.Future;

Expand Down Expand Up @@ -78,14 +77,23 @@ static HttpServer create(String name, ServerOptions options) {
*/
HttpServer handle(Consumer<RequestHandle> h);

/**
* Sets the handler for listening connection init.
*
* @param h handler
*
* @return this
*/
HttpServer onConnectionInit(Consumer<Channel> h);

/**
* Sets the handler for listening connection connected.
*
* @param h handler
*
* @return this
*/
HttpServer onConnected(Consumer<ChannelHandlerContext> h);
HttpServer onConnected(Consumer<Channel> h);

/**
* Sets the handler for listening connection disconnected.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,20 @@ final class HttpServerChannelInitializr extends ChannelInitializer<Channel> {
private final ServerRuntime runtime;
private final SslHelper sslHelper;
private final Consumer<RequestHandle> handler;
private final Consumer<ChannelHandlerContext> onConnected;
private final Consumer<Channel> onConnectionInit;
private final Consumer<Channel> onConnected;
private final Consumer<Channel> onDisconnected;

HttpServerChannelInitializr(ServerRuntime runtime,
SslHelper sslHelper,
Consumer<RequestHandle> handler,
Consumer<ChannelHandlerContext> onConnected,
Consumer<Channel> onConnectionInit,
Consumer<Channel> onConnected,
Consumer<Channel> onDisconnected) {
this.runtime = runtime;
this.sslHelper = sslHelper;
this.handler = handler;
this.onConnectionInit = onConnectionInit;
this.onConnected = onConnected;
this.onDisconnected = onDisconnected;
}
Expand All @@ -76,6 +79,14 @@ protected void initChannel(Channel ch) {
return;
}

if (onConnectionInit != null) {
try {
onConnectionInit.accept(ch);
} catch (Throwable t) {
Loggers.logger().warn("Error while processing onConnectionInit()", t);
}
}

final ChannelPipeline pipeline = ch.pipeline();
// options for each accepted channel
applyChannelOptions(ch);
Expand All @@ -91,7 +102,7 @@ protected void initChannel(Channel ch) {
}
if (onConnected != null) {
try {
onConnected.accept(ctx);
onConnected.accept(ch);
} catch (Throwable t) {
Loggers.logger().warn("Error while processing onConnected()", t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public class HttpServerImpl implements HttpServer {

private final ServerRuntime runtime;
private Consumer<RequestHandle> handler;
private Consumer<ChannelHandlerContext> onConnected;
private Consumer<Channel> onConnectionInit;
private Consumer<Channel> onConnected;
private Consumer<Channel> onDisconnected;
private final CloseFuture closeFuture = new CloseFuture();
private final CopyOnWriteArrayList<Runnable> closures = new CopyOnWriteArrayList<>();
Expand All @@ -78,7 +79,14 @@ public synchronized HttpServerImpl handle(Consumer<RequestHandle> h) {
}

@Override
public synchronized HttpServerImpl onConnected(Consumer<ChannelHandlerContext> h) {
public HttpServer onConnectionInit(Consumer<Channel> h) {
checkStarted();
this.onConnectionInit = h;
return this;
}

@Override
public synchronized HttpServerImpl onConnected(Consumer<Channel> h) {
checkStarted();
this.onConnected = h;
return this;
Expand Down Expand Up @@ -140,7 +148,7 @@ public Future<Void> closeFuture() {
private synchronized HttpServerImpl listen0(SocketAddress address) {
checkStarted();
Checks.checkNotNull(address, "address");
Checks.checkNotNull(handler, "Request handler required");
Checks.checkNotNull(handler, "Request handler required. Set it by HttpServer.handle(xxx)");
final ServerBootstrap bootstrap = new ServerBootstrap();

final Transport transport = Transports.transport(options().isPreferNativeTransport());
Expand All @@ -153,6 +161,7 @@ private synchronized HttpServerImpl listen0(SocketAddress address) {
bootstrap.childHandler(new HttpServerChannelInitializr(runtime,
sslHelper,
handler,
onConnectionInit,
onConnected,
onDisconnected));
final EventLoopGroup bossGroup = transport.loop(options().getBossThreads(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ void testCloseChannelIfServerIsShutdown() {
new SslHelper(null, false),
r -> r.response().end(),
null,
null,
null);
final EmbeddedChannel channel = new EmbeddedChannel(initializr);
assertFalse(channel.isActive());
Expand All @@ -81,6 +82,7 @@ void testApplyWriteBufferHighWaterMark() {
new SslHelper(null, false),
r -> r.response().end(),
null,
null,
null);
final EmbeddedChannel channel = new EmbeddedChannel(initializr);
assertEquals(WriteBufferWaterMark.DEFAULT.high() + 1,
Expand All @@ -96,6 +98,7 @@ void testApplyWriteBufferLowWaterMark() {
new SslHelper(null, false),
r -> r.response().end(),
null,
null,
null);
final EmbeddedChannel channel = new EmbeddedChannel(initializr);
assertEquals(WriteBufferWaterMark.DEFAULT.low() - 1,
Expand All @@ -112,6 +115,7 @@ void testApplyWriteBufferWaterMark() {
new SslHelper(null, false),
r -> r.response().end(),
null,
null,
null);
final EmbeddedChannel channel = new EmbeddedChannel(initializr);
assertEquals(WriteBufferWaterMark.DEFAULT.low() - 1,
Expand All @@ -129,6 +133,7 @@ void testHAProxyDetectorInitialization() {
new SslHelper(null, false),
r -> r.response().end(),
null,
null,
null);
final EmbeddedChannel channel = new EmbeddedChannel(initializr);
assertNotNull(channel.pipeline().get(HAProxyDetector.class));
Expand All @@ -144,6 +149,7 @@ void testHAProxyDecoderInitialization() {
new SslHelper(null, false),
r -> r.response().end(),
null,
null,
null);
final EmbeddedChannel channel = new EmbeddedChannel(initializr);
assertNotNull(channel.pipeline().get(HAProxyMessageDecoder.class));
Expand All @@ -162,14 +168,16 @@ void testHAProxyOffInitialization() {
new SslHelper(null, false),
r -> r.response().end(),
null,
null,
null);
final EmbeddedChannel channel = new EmbeddedChannel(initializr);
assertNull(channel.pipeline().get(HAProxyMessageDecoder.class));
assertNull(channel.pipeline().get(HAProxyMessageHandler.class));
}

@Test
void testChannelActiveAndInactiveListener() {
void testChannelInitAndActiveAndInactiveListener() {
final AtomicBoolean init = new AtomicBoolean();
final AtomicBoolean active = new AtomicBoolean();
final AtomicBoolean inActive = new AtomicBoolean();
final ServerRuntime runtime = Helper.serverRuntime(ServerOptionsConfigure.newOpts()
Expand All @@ -179,11 +187,13 @@ void testChannelActiveAndInactiveListener() {
new HttpServerChannelInitializr(runtime,
new SslHelper(null, false),
r -> r.response().end(),
ctx -> active.set(true),
c -> init.set(true),
c -> active.set(true),
c -> inActive.set(true));

final EmbeddedChannel channel = new EmbeddedChannel(initializr);

assertTrue(init.get());
assertTrue(active.get());
assertFalse(inActive.get());

Expand All @@ -203,6 +213,7 @@ void testChannelActiveAndInactiveListener() {

@Test
void testErrorOccurredInConnectionHandlerShouldBeIgnored() {
final AtomicBoolean init = new AtomicBoolean();
final AtomicBoolean active = new AtomicBoolean();
final AtomicBoolean inActive = new AtomicBoolean();
final ServerRuntime runtime = Helper.serverRuntime(ServerOptionsConfigure.newOpts()
Expand All @@ -212,7 +223,11 @@ void testErrorOccurredInConnectionHandlerShouldBeIgnored() {
new HttpServerChannelInitializr(runtime,
new SslHelper(null, false),
r -> r.response().end(),
ctx -> {
c -> {
init.set(true);
ExceptionUtils.throwException(new IllegalStateException());
},
c -> {
active.set(true);
ExceptionUtils.throwException(new IllegalStateException());
},
Expand All @@ -223,6 +238,7 @@ void testErrorOccurredInConnectionHandlerShouldBeIgnored() {

final EmbeddedChannel channel = new EmbeddedChannel(initializr);

assertTrue(init.get());
assertTrue(active.get());
assertFalse(inActive.get());
channel.close();
Expand Down Expand Up @@ -426,6 +442,7 @@ private static void testUnsupportedUpgradeProtocol(boolean logging,
new SslHelper(null, false),
r -> r.response().end(),
null,
null,
null);

final EmbeddedChannel channel = new EmbeddedChannel(initializr);
Expand Down Expand Up @@ -502,6 +519,7 @@ private static void testUpgradeError(boolean logging,
});
},
null,
null,
null);

final EmbeddedChannel channel = new EmbeddedChannel(initializr);
Expand Down Expand Up @@ -573,6 +591,7 @@ private static void testUpgrade(boolean logging,
new SslHelper(null, false),
r -> r.response().end(),
null,
null,
null);

final EmbeddedChannel channel = new EmbeddedChannel(initializr);
Expand Down Expand Up @@ -637,6 +656,7 @@ private static void testHttp1HandlerInitialization(boolean logging,
new SslHelper(null, false),
r -> r.response().end(),
null,
null,
null);

final EmbeddedChannel channel = new EmbeddedChannel(initializr);
Expand Down Expand Up @@ -681,6 +701,7 @@ private static void testSslDetectFailedWithHttp2Disabled(boolean logging,
new SslHelper(runtime.options().getSsl(), false),
r -> r.response().end(),
null,
null,
null);

final EmbeddedChannel channel = new EmbeddedChannel(initializr);
Expand Down Expand Up @@ -737,6 +758,7 @@ private static void testSslDetectFailedWithHttp2Enabled(boolean logging,
new SslHelper(runtime.options().getSsl(), false),
r -> r.response().end(),
null,
null,
null);

final EmbeddedChannel channel = new EmbeddedChannel(initializr);
Expand Down Expand Up @@ -805,6 +827,7 @@ private static void testH1HandlerInitializationAfterH2cDetector(boolean logging,
new SslHelper(null, false),
r -> r.response().end(),
null,
null,
null);

final EmbeddedChannel channel = new EmbeddedChannel(initializr);
Expand Down Expand Up @@ -869,6 +892,7 @@ private static void testH2cHandlerInitializationAfterH2cDetector(boolean logging
new SslHelper(null, false),
r -> r.response().end(),
null,
null,
null);

final EmbeddedChannel channel = new EmbeddedChannel(initializr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,13 @@ void testStatus() throws InterruptedException {


final AtomicBoolean onClose = new AtomicBoolean();
final AtomicBoolean onConnectionInit = new AtomicBoolean();
final AtomicBoolean onConnected = new AtomicBoolean();
final AtomicBoolean onDisconnected = new AtomicBoolean();
final AtomicBoolean onHandle = new AtomicBoolean();
final CompletableFuture<Boolean> closed = new CompletableFuture<>();
assertDoesNotThrow(() -> server.onClose(() -> onClose.set(true)));
assertDoesNotThrow(() -> server.onConnectionInit(ctx -> onConnectionInit.set(true)));
assertDoesNotThrow(() -> server.onConnected(ctx -> onConnected.set(true)));
assertDoesNotThrow(() -> server.onDisconnected(ctx -> onDisconnected.set(true)));
assertDoesNotThrow(() -> server.handle(ctx -> onHandle.set(true)));
Expand Down Expand Up @@ -100,10 +102,12 @@ void testStatus() throws InterruptedException {
assertNotNull(server.bossGroup());
assertNotNull(server.ioGroup());

assertThrows(IllegalStateException.class, () -> server.onConnectionInit(ctx -> onConnected.set(true)));
assertThrows(IllegalStateException.class, () -> server.onConnected(ctx -> onConnected.set(true)));
assertThrows(IllegalStateException.class, () -> server.onDisconnected(ctx -> onDisconnected.set(true)));
assertThrows(IllegalStateException.class, () -> server.handle(ctx -> onHandle.set(true)));

assertFalse(onConnectionInit.get());
assertFalse(onConnected.get());
assertFalse(onDisconnected.get());
assertFalse(onHandle.get());
Expand Down