Skip to content

AMF-991: Support for max.connection.duration.ms #483

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -257,5 +257,10 @@
<artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
13 changes: 13 additions & 0 deletions core/src/main/java/io/confluent/rest/Application.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.confluent.rest.exceptions.JsonMappingExceptionMapper;
import io.confluent.rest.exceptions.JsonParseExceptionMapper;
import io.confluent.rest.extension.ResourceExtension;
import io.confluent.rest.filters.ConnectionDurationFilter;
import io.confluent.rest.filters.CsrfTokenProtectionFilter;
import io.confluent.rest.handlers.SniHandler;
import io.confluent.rest.metrics.Jetty429MetricsDosFilterListener;
Expand Down Expand Up @@ -402,6 +403,8 @@ public Handler configureHandler() {

configureDosFilters(context);

configureConnectionDurationFilter(context);

configurePreResourceHandling(context);
context.addFilter(servletHolder, "/*", null);
configurePostResourceHandling(context);
Expand Down Expand Up @@ -751,6 +754,16 @@ private void configureGlobalDosFilter(ServletContextHandler context) {
context.addFilter(filterHolder, "/*", EnumSet.of(DispatcherType.REQUEST));
}

private void configureConnectionDurationFilter(ServletContextHandler context) {
if (config.getMaxConnectionDuration() > 0) {
FilterHolder filterHolder = new FilterHolder(ConnectionDurationFilter.class);
filterHolder.setName("connection-duration-filter");
filterHolder.setInitParameter(RestConfig.MAX_CONNECTION_DURATION_MS,
String.valueOf(config.getMaxConnectionDuration()));
context.addFilter(filterHolder, "/*", EnumSet.of(DispatcherType.REQUEST));
}
}

private FilterHolder configureDosFilter(DoSFilter dosFilter, String rate) {

FilterHolder filterHolder = new FilterHolder(dosFilter);
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/java/io/confluent/rest/RestConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,13 @@ public class RestConfig extends AbstractConfig {
+ "If the limit is set to a non-positive number, no limit is applied. Default is 0.";
private static final int SERVER_CONNECTION_LIMIT_DEFAULT = 0;

public static final String MAX_CONNECTION_DURATION_MS = "max.connection.duration.ms";
public static final String MAX_CONNECTION_DURATION_MS_DOC =
"The maximum duration in milliseconds that a connection can be open. "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth clarifying this is the TCP connection the requests come over, to help differentiate this config from

"Length of time, in ms, to allow the request to run. Default is 30000L.";
?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member

@msn-tldr msn-tldr May 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CMIIW. As i understand, this config only applies for connections that are not-idle, i.e i.e. receiving http requests. So should this be renamed to MAX_NON_IDLE_TCP_CONNECTION_DURATION_MS ?

And add to the comment that for idle connections MAX_IDLE_TIMEOUT_MS should be configured appropriately. Wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good points on documentation update. Will fix.

On the naming of the config item, MAX_NON_IDLE_TCP_CONNECTION_DURATION_MS could be confusing -- one might wonder whether the counter start when the connection is non idle and gets reset for idle connection, and all sorts of similar things. The config is an absolute maximum limit for how long a single connection is allowed to be open, regardless of whether that's idle or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The config is an absolute maximum limit for how long a single connection is allowed to be open, regardless of whether that's idle or not.

is this correct though?

Jetty Filter is run on a request, or a response. So if connection is idle, i.e. no request or response, this filter won't run. So only for non-idle connection this config would work. That's why i suggested above(quoted below) that for idle connections MAX_IDLE_TIMEOUT_MS should be set.

And add to the comment that for idle connections MAX_IDLE_TIMEOUT_MS should be configured appropriately.

re. naming, I am ok with the current name as naming is hard 😃 . As long the comment makes the right behaviour explicit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, you are right on the caveat of idle connections. My narrow-minded focus for adding this feature was specifically for very active connections that last days/weeks.
A combination of "idle timeout" + this "max connection duration (reworded differently)" should go hand-in-hand for any config tuning.
I will update the documentation to reflect that.

+ "If a connection is open for longer than this duration, it will be closed. "
+ "If set to 0, no limit is applied. Default is 0.";
protected static final long MAX_CONNECTION_DURATION_MS_DEFAULT = 0;

// For rest-utils applications connectors correspond to configured listeners. See
// ApplicationServer#parseListeners for more details.
private static final String CONNECTOR_CONNECTION_LIMIT = "connector.connection.limit";
Expand Down Expand Up @@ -1050,6 +1057,11 @@ private static ConfigDef incompleteBaseConfigDef() {
SERVER_CONNECTION_LIMIT_DEFAULT,
Importance.LOW,
SERVER_CONNECTION_LIMIT_DOC
).define(MAX_CONNECTION_DURATION_MS,
Type.LONG,
MAX_CONNECTION_DURATION_MS_DEFAULT,
Importance.LOW,
MAX_CONNECTION_DURATION_MS_DOC
).define(
CONNECTOR_CONNECTION_LIMIT,
Type.INT,
Expand Down Expand Up @@ -1258,6 +1270,10 @@ public final int getServerConnectionLimit() {
return getInt(SERVER_CONNECTION_LIMIT);
}

public final long getMaxConnectionDuration() {
return getLong(MAX_CONNECTION_DURATION_MS);
}

public final int getConnectorConnectionLimit() {
return getInt(CONNECTOR_CONNECTION_LIMIT);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2024 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.confluent.rest.filters;

import io.confluent.rest.RestConfig;
import java.io.IOException;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Simple server-side request filter that limits the connection age.
*
*
* <p>Long-running client connections can be problematic for multiple server instances behind
* an NLB due to uneven load distribution (especially in case of HTTP/2.0 that specifically
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even HTTP 1.1 uses Persistent connections by default. So this could be helpful there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. will clarify with a documentation update

* encourages long-lived connections). This filter closes any connection that receives a request
* after the connection has been open for {@link RestConfig#MAX_CONNECTION_DURATION_MS} ms.</p>
*/
public class ConnectionDurationFilter implements Filter {

private static final Logger log = LoggerFactory.getLogger(ConnectionDurationFilter.class);
private static long MAX_CONNECTION_DURATION_MS = -1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason why MAX_CONNECTION_DURATION_MS is static?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo. will fix.
thanks for pointing out.


@Override
public void init(FilterConfig filterConfig) throws ServletException {
String maxConnectionDuration = filterConfig.getInitParameter(
RestConfig.MAX_CONNECTION_DURATION_MS);
if (maxConnectionDuration != null) {
MAX_CONNECTION_DURATION_MS = Long.parseLong(maxConnectionDuration);
}
}

@Override
public void doFilter(
ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain)
throws IOException, ServletException {

HttpServletResponse httpServletResponse = (HttpServletResponse) servletResponse;

HttpChannel channel = ((Response) httpServletResponse).getHttpChannel();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we use several unchecked cast here, are they all safe?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. The typecasts in general should be safe given that rest-utils is specifically targeting the HTTP request serving, but I will add some checks just to be on the safe side and to avoid any issues in the future.


long connectionCreationTime = channel.getConnection().getCreatedTimeStamp();
long connectionAge = System.currentTimeMillis() - connectionCreationTime;

if (connectionAge > MAX_CONNECTION_DURATION_MS) {
log.debug("Connection from remote peer {} has been active for {}ms. Closing the connection.",
channel.getRemoteAddress(), connectionAge);
channel.getEndPoint().close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also use the filterChain to stop processing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I know how to do it. The "don't keep processing the request" sounds reasonable (which is what I think you are referring to), but unfortunately Filterchain only exposes a doFilter method.
Do you have some suggestion on how to go about it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can do that by not calling filterChain.doFilter, i.e. return early in if block at line 68.

} else {
log.trace("Connection from remote peer {} is {}ms old. Leaving the connection as is",
channel.getRemoteAddress(), connectionAge);
}

filterChain.doFilter(servletRequest, servletResponse);

}

@Override
public void destroy() {

}

}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really see the test where the client connection is terminated due to exceeding max duration, could you please give some pointeR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current tests echo back the client address (IP address + client-port), which is a proxy for the connection itself.
If the same connection is reused, then the (IP address + client-port) pair stays the same across the requests.
If a new connection is established, the client-port changes.
The current tests try to make a number of calls with a delay longer than the "max connection duration", and assert that a new (IP address + client-port) was used for each of them.

I now see how that is somewhat obtuse, and all this explanation probably needs to make it to the test itself as a comment. What do you think? Does that make sense and address the need for the tests?

On why I had to choose this complicated roundabout way of verifying that the connections are terminated -- http clients (both apache http client and okhttp client) don't really have any public interface to give socket level information back to the user. Both of them like to use internal mechanisms to maintain connection pools or attempt retries without necessarily exposing what's happening at the connection level back to the user. That's where I ended up conjuring up a server application that echoes back the (client IP + client-port) pair.

I should also add a case when multiple requests are made all within the "max connection duration", and show that they indeed are using the same connection under the hood.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merging the other comment https://github.com/confluentinc/rest-utils/pull/483/files#r1621942383 here as they are similar.

I now see how that is somewhat obtuse, and all this explanation probably needs to make it to the test itself as a comment.

+1

Both of them like to use internal mechanisms to maintain connection pools or attempt retries without necessarily exposing what's happening at the connection level back to the user.

Is it possible to set the http retries to 0 in http-clients? So that client only sends HTTP request once, max_duration exceeds -> connection closed, and request fails. As retries are 0, the error/exception is passed back to the test.

It would be gr8 to have the behaviour(connection disconnects -> test sees error) explicit in test, as that is much easier to reason in future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can test this indirectly with client sending multiple requests for a span of time, and since the connection exceed the max time, the remaining requests will not complete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah setting retries =>0 sounds like a good idea.
Let me rework the tests a bit incorporating all the feedback, thank you for all the suggestions.

I also realized that the "ephemeral client" is not really an ephemeral client. This snippet of code that I got from some other tests in the repo doesn't actually create ephemeral clients. Setting that to "-1" essentially means "No suggestion on keepalive strategy" (ref). 🤦

private static CloseableHttpClient createEphemeralClient() {
    return HttpClients.custom()
        .setConnectionManager(new BasicHttpClientConnectionManager())
        .setKeepAliveStrategy((httpResponse, httpContext) -> -1)
        .build();
  }

Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
/*
* Copyright 2024 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.confluent.rest;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.google.common.collect.ImmutableMap;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.Configurable;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.BasicHttpClientConnectionManager;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
import org.eclipse.jetty.server.Server;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;


/*
* Note on client support --
* The Apache HttpClient library v4.x does not support HTTP/2.0, so we use
* OkHttp for HTTP/2.0 tests.
*/
public class ConnectionDurationFilterTest {

static Server server;
private static final long MAX_CONNECTION_DURATION_MS = 100;

@BeforeAll
public static void setUp() throws Exception {
MyAddressApplication application =
new MyAddressApplication(
new MyAddressConfig(
ImmutableMap.of(
"listeners", "http://localhost:8080",
"http2.enabled", "true",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this applicable for http1.1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.
"http2.enabled" simply says whether http 2.0 is supported or not.
http 1 is supported by default

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we make the tests parameterized on http1.1 and http2

"max.connection.duration.ms", String.valueOf(MAX_CONNECTION_DURATION_MS)))
);
server = application.createServer();
server.start();
}

@AfterAll
public static void tearDown() throws Exception {
server.stop();
}

@Test
public void connectionDurationTest_ApacheHttpClient_Ephemeral_HTTP1() throws Exception {

CloseableHttpClient client = createPersistentClient();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be creating an ephemeral client given the test is called ephemeral? Naming should be consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies. It's a typo. will filx

List<String> addressList = new ArrayList<>();

// Send a few requests with a delay between consecutive requests that is longer
// than the connection duration threshold.
// Each request should succeed and the server should return a different address for each request.
for (int i=0; i<5; i++) {

HttpGet request = createRequest(server.getURI());
assertEquals("HTTP/1.1", request.getProtocolVersion().toString());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the request is http1.1 when your server has http2 enabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a limitation of the Apache http client.
HTTP client version 4.x only supports http 1.x.
HTTP 2.x support was added in version 5.x (see here).

Using 5.x http client would require a major version upgrade of that package -- there are a number of major changes (including relocated packages, etc) making 4.x -> 5.x migration non-trivial (probably a separate future pull request on its own).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have jetty http client code example across this repo. You might want to take a look.


CloseableHttpResponse response = client.execute(request);
assertEquals(Status.OK.getStatusCode(), response.getStatusLine().getStatusCode());

String responseBody = EntityUtils.toString(response.getEntity());
assertTrue(responseBody.startsWith("127.0.0.1:"));

response.close();
addressList.add(responseBody);
Thread.sleep(MAX_CONNECTION_DURATION_MS + 10);
}
client.close();

// check that we actually used 5 different addresses under the hood
assertEquals(5, addressList.stream().distinct().count());
}

@Test
public void connectionDurationTest_ApacheHttpClient_Persistent_HTTP1() throws Exception {

CloseableHttpClient client = createEphemeralClient();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming should be consistent, similar to comment on line 83.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies. It's a typo. will filx

List<String> addressList = new ArrayList<>();

// Send a few requests with a delay between consecutive requests that is longer
// than the connection duration threshold.
// Each request should succeed and the server should return a different address for each request.
for (int i=0; i<5; i++) {

HttpGet request = createRequest(server.getURI());
assertEquals("HTTP/1.1", request.getProtocolVersion().toString());

CloseableHttpResponse response = client.execute(request);
assertEquals(Status.OK.getStatusCode(), response.getStatusLine().getStatusCode());

String responseBody = EntityUtils.toString(response.getEntity());
assertTrue(responseBody.startsWith("127.0.0.1:"));

response.close();
addressList.add(responseBody);
Thread.sleep(MAX_CONNECTION_DURATION_MS + 10);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going by the test name, is this test creating "persistent" tcp connections? So should the request fail given the connection remains idle for longer than MAX_CONNECTION_DURATION_MS

}
client.close();

// check that we actually used 5 different addresses under the hood
assertEquals(5, addressList.stream().distinct().count());
}

@Test
public void connectionDurationTest_OkHttpClient_HTTP2() throws Exception {

// explicitly set HTTP 2.0 protocol
OkHttpClient client = new OkHttpClient.Builder()
.protocols(Collections.singletonList(okhttp3.Protocol.H2_PRIOR_KNOWLEDGE))
.build();

List<String> addressList = new ArrayList<>();

// Send a few requests with a delay between consecutive requests that is longer
// than the connection duration threshold.
// Each request should succeed and the server should return a different address for each request.
for (int i=0; i<5; i++) {
Request request = new Request.Builder()
.url("http://localhost:8080/whatsmyaddress")
.build();

Response response = client.newCall(request).execute();
// assert that the request was indeed http 2
assertEquals("h2_prior_knowledge", response.protocol().toString());

assertEquals(Status.OK.getStatusCode(), response.code());

String responseBody = response.body().string();
assertTrue(responseBody.startsWith("127.0.0.1:"));

response.close();
addressList.add(responseBody);
Thread.sleep(MAX_CONNECTION_DURATION_MS + 10);
}

// shutdown the client
client.dispatcher().executorService().shutdown();

// check that we actually used 5 different addresses under the hood
assertEquals(5, addressList.stream().distinct().count());

}

private static HttpGet createRequest(URI serverUri) {
return new HttpGet(UriBuilder.fromUri(serverUri).path("/whatsmyaddress").build());
}

private static CloseableHttpClient createEphemeralClient() {
return HttpClients.custom()
.setConnectionManager(new BasicHttpClientConnectionManager())
.setKeepAliveStrategy((httpResponse, httpContext) -> -1)
.build();
}

private static CloseableHttpClient createPersistentClient() {
return HttpClients.custom()
.setConnectionManager(new PoolingHttpClientConnectionManager())
.setKeepAliveStrategy((httpResponse, httpContext) -> 5000)
.build();
}

public static final class MyAddressApplication extends Application<MyAddressConfig> {

public MyAddressApplication(MyAddressConfig config) {
super(config);
}

@Override
public void setupResources(Configurable<?> config, MyAddressConfig appConfig) {
config.register(AddressResource.class);
}
}

public static final class MyAddressConfig extends RestConfig {

public MyAddressConfig(Map<String, String> configs) {
super(baseConfigDef(), configs);
}
}

@Path("/whatsmyaddress")
public static final class AddressResource {

@GET
public String getAddress(@Context HttpServletRequest request) {
return request.getRemoteAddr() + ":" + request.getRemotePort();
}
}
}
Loading