diff --git a/httpclient5-testing/pom.xml b/httpclient5-testing/pom.xml index 88baa4329f..c308b27344 100644 --- a/httpclient5-testing/pom.xml +++ b/httpclient5-testing/pom.xml @@ -77,6 +77,12 @@ httpclient5-fluent test + + com.kohlschutter.junixsocket + junixsocket-core + test + pom + org.junit.jupiter junit-jupiter-params @@ -153,4 +159,4 @@ - \ No newline at end of file + diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/sync/StandardTestClientBuilder.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/sync/StandardTestClientBuilder.java index 125473e5e8..15be8f914b 100644 --- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/sync/StandardTestClientBuilder.java +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/sync/StandardTestClientBuilder.java @@ -27,6 +27,7 @@ package org.apache.hc.client5.testing.extension.sync; +import java.nio.file.Path; import java.util.Collection; import org.apache.hc.client5.http.AuthenticationStrategy; @@ -36,6 +37,7 @@ import org.apache.hc.client5.http.auth.CredentialsProvider; import org.apache.hc.client5.http.classic.ExecChainHandler; import org.apache.hc.client5.http.config.ConnectionConfig; +import org.apache.hc.client5.http.config.RequestConfig; import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder; @@ -157,6 +159,14 @@ public TestClientBuilder setDefaultCredentialsProvider(final CredentialsProvider return this; } + @Override + public TestClientBuilder setUnixDomainSocket(final Path unixDomainSocket) { + this.clientBuilder.setDefaultRequestConfig(RequestConfig.custom() + .setUnixDomainSocket(unixDomainSocket) + .build()); + return this; + } + @Override public TestClient build() throws Exception { final HttpClientConnectionManager connectionManagerCopy = connectionManager != null ? connectionManager : diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/sync/TestClientBuilder.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/sync/TestClientBuilder.java index 459124693f..775ff47c4b 100644 --- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/sync/TestClientBuilder.java +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/sync/TestClientBuilder.java @@ -27,6 +27,7 @@ package org.apache.hc.client5.testing.extension.sync; +import java.nio.file.Path; import java.util.Collection; import org.apache.hc.client5.http.AuthenticationStrategy; @@ -103,6 +104,10 @@ default TestClientBuilder setDefaultCredentialsProvider(CredentialsProvider cred throw new UnsupportedOperationException("Operation not supported by " + getProtocolLevel()); } + default TestClientBuilder setUnixDomainSocket(Path unixDomainSocket) { + throw new UnsupportedOperationException("Operation not supported by " + getProtocolLevel()); + } + TestClient build() throws Exception; } diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/sync/TestClientResources.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/sync/TestClientResources.java index 2dcb6356ec..b18a11f4e1 100644 --- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/sync/TestClientResources.java +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/sync/TestClientResources.java @@ -50,6 +50,7 @@ public class TestClientResources implements AfterEachCallback { private TestServer server; private TestClient client; + private UnixDomainProxyServer udsProxy; public TestClientResources(final URIScheme scheme, final ClientProtocolLevel clientProtocolLevel, final Timeout timeout) { this.scheme = scheme != null ? scheme : URIScheme.HTTP; @@ -74,6 +75,9 @@ public void afterEach(final ExtensionContext extensionContext) { if (client != null) { client.close(CloseMode.GRACEFUL); } + if (udsProxy != null) { + udsProxy.close(); + } if (server != null) { server.shutdown(CloseMode.IMMEDIATE); } @@ -99,6 +103,15 @@ public TestServer server() throws Exception { return server; } + public UnixDomainProxyServer udsProxy() throws Exception { + if (udsProxy == null) { + final TestServer testServer = server(); + final int port = testServer.getServerAddress().getPort(); + udsProxy = new UnixDomainProxyServer(port); + } + return udsProxy; + } + public void configureClient(final Consumer clientCustomizer) { Asserts.check(client == null, "Client is already running and cannot be changed"); clientCustomizer.accept(clientBuilder); diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/sync/TestServer.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/sync/TestServer.java index 039fcb7b71..76836309b8 100644 --- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/sync/TestServer.java +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/sync/TestServer.java @@ -43,6 +43,7 @@ public class TestServer { private final Http1Config http1Config; private final HttpProcessor httpProcessor; private final Decorator exchangeHandlerDecorator; + private volatile InetSocketAddress serverAddress; TestServer( final ClassicTestServer server, @@ -64,7 +65,14 @@ public InetSocketAddress start() throws IOException { server.configure(exchangeHandlerDecorator); server.configure(httpProcessor); server.start(); - return new InetSocketAddress(server.getInetAddress(), server.getPort()); + serverAddress = new InetSocketAddress(server.getInetAddress(), server.getPort()); + return serverAddress; } + public InetSocketAddress getServerAddress() { + if (serverAddress == null) { + throw new IllegalStateException("Server has not been started"); + } + return serverAddress; + } } diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/sync/UnixDomainProxyServer.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/sync/UnixDomainProxyServer.java new file mode 100644 index 0000000000..048ad41063 --- /dev/null +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/sync/UnixDomainProxyServer.java @@ -0,0 +1,137 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.client5.testing.extension.sync; + +import org.newsclub.net.unix.AFUNIXServerSocket; +import org.newsclub.net.unix.AFUNIXSocket; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static java.util.concurrent.CompletableFuture.supplyAsync; + +public final class UnixDomainProxyServer { + private final int port; + private final ExecutorService executorService; + private final Path socketPath; + private final CountDownLatch serverReady = new CountDownLatch(1); + private volatile AFUNIXServerSocket serverSocket; + + public UnixDomainProxyServer(final int port) { + this.port = port; + this.executorService = Executors.newCachedThreadPool(); + this.socketPath = (new File("proxy.sock")).toPath(); + } + + public void start() { + executorService.submit(this::runUdsProxy); + try { + serverReady.await(); + } catch (final InterruptedException ex) { + throw new RuntimeException(ex); + } + } + + public Path getSocketPath() { + return socketPath; + } + + public void close() { + try { + serverSocket.close(); + executorService.shutdownNow(); + if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { + throw new RuntimeException("Failed to shut down"); + } + Files.deleteIfExists(socketPath); + } catch (final InterruptedException | IOException ex) { + throw new RuntimeException(ex); + } + } + + private void runUdsProxy() { + try { + Files.deleteIfExists(socketPath); + } catch (final IOException ignore) { + } + + try { + try (final AFUNIXServerSocket server = AFUNIXServerSocket.bindOn(socketPath, true)) { + this.serverSocket = server; + serverReady.countDown(); + serveRequests(server); + } catch (final Throwable ignore) { + } + } catch (final Throwable t) { + serverReady.countDown(); + throw t; + } + } + + private void serveRequests(final AFUNIXServerSocket server) throws IOException { + while (true) { + final AFUNIXSocket udsClient = server.accept(); + final Socket tcpSocket = new Socket("localhost", port); + final CompletableFuture f1 = supplyAsync(() -> pipe(udsClient, tcpSocket), executorService); + final CompletableFuture f2 = supplyAsync(() -> pipe(tcpSocket, udsClient), executorService); + CompletableFuture.allOf(f1, f2).whenComplete((result, ex) -> { + try { + udsClient.close(); + tcpSocket.close(); + } catch (final IOException ignore) { + } + }); + } + } + + private Void pipe(final Socket inputSocket, final Socket outputSocket) { + try ( + final InputStream in = inputSocket.getInputStream(); + final OutputStream out = outputSocket.getOutputStream() + ) { + final byte[] buf = new byte[8192]; + int len; + while ((len = in.read(buf)) != -1) { + out.write(buf, 0, len); + out.flush(); + } + } catch (final IOException ignore) { + } + return null; + } +} diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/AbstractIntegrationTestBase.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/AbstractIntegrationTestBase.java index eb83f3dcdd..6e6b24b893 100644 --- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/AbstractIntegrationTestBase.java +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/AbstractIntegrationTestBase.java @@ -28,6 +28,7 @@ package org.apache.hc.client5.testing.sync; import java.net.InetSocketAddress; +import java.nio.file.Path; import java.util.function.Consumer; import org.apache.hc.client5.testing.extension.sync.ClientProtocolLevel; @@ -47,9 +48,16 @@ abstract class AbstractIntegrationTestBase { @RegisterExtension private final TestClientResources testResources; + private final boolean useUnixDomainSocket; protected AbstractIntegrationTestBase(final URIScheme scheme, final ClientProtocolLevel clientProtocolLevel) { + this(scheme, clientProtocolLevel, false); + } + + protected AbstractIntegrationTestBase( + final URIScheme scheme, final ClientProtocolLevel clientProtocolLevel, final boolean useUnixDomainSocket) { this.testResources = new TestClientResources(scheme, clientProtocolLevel, TIMEOUT); + this.useUnixDomainSocket = useUnixDomainSocket; } public URIScheme scheme() { @@ -67,6 +75,9 @@ public void configureServer(final Consumer serverCustomizer public HttpHost startServer() throws Exception { final TestServer server = testResources.server(); final InetSocketAddress inetSocketAddress = server.start(); + if (useUnixDomainSocket) { + testResources.udsProxy().start(); + } return new HttpHost(testResources.scheme().id, "localhost", inetSocketAddress.getPort()); } @@ -75,6 +86,12 @@ public void configureClient(final Consumer clientCustomizer) } public TestClient client() throws Exception { + if (useUnixDomainSocket) { + final Path socketPath = testResources.udsProxy().getSocketPath(); + testResources.configureClient(builder -> { + builder.setUnixDomainSocket(socketPath); + }); + } return testResources.client(); } diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/TestConnectionReuse.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/TestConnectionReuse.java index 7e47520563..02b6aaeb0c 100644 --- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/TestConnectionReuse.java +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/TestConnectionReuse.java @@ -32,9 +32,12 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.hc.client5.http.classic.methods.HttpGet; import org.apache.hc.client5.http.classic.methods.HttpPost; +import org.apache.hc.client5.http.config.RequestConfig; import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager; import org.apache.hc.client5.testing.classic.RandomHandler; @@ -51,16 +54,18 @@ import org.apache.hc.core5.http.HttpResponseInterceptor; import org.apache.hc.core5.http.URIScheme; import org.apache.hc.core5.http.impl.HttpProcessors; +import org.apache.hc.core5.http.io.HttpClientResponseHandler; import org.apache.hc.core5.http.io.entity.EntityUtils; import org.apache.hc.core5.http.io.entity.InputStreamEntity; import org.apache.hc.core5.http.protocol.HttpContext; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; -class TestConnectionReuse extends AbstractIntegrationTestBase { - - public TestConnectionReuse() { - super(URIScheme.HTTP, ClientProtocolLevel.STANDARD); +abstract class AbstractTestConnectionReuse extends AbstractIntegrationTestBase { + protected AbstractTestConnectionReuse(final URIScheme scheme, final ClientProtocolLevel clientProtocolLevel, + final boolean useUnixDomainSocket) { + super(scheme, clientProtocolLevel, useUnixDomainSocket); } @Test @@ -295,14 +300,8 @@ public WorkerThread( final URI requestURI, final int repetitions, final boolean forceClose) { - super(); - this.httpclient = httpclient; - this.target = target; - this.forceClose = forceClose; - this.requests = new ArrayList<>(repetitions); - for (int i = 0; i < repetitions; i++) { - requests.add(new HttpGet(requestURI)); - } + this(httpclient, target, forceClose, + Stream.generate(() -> new HttpGet(requestURI)).limit(repetitions).collect(Collectors.toList())); } public WorkerThread( @@ -357,5 +356,54 @@ public void process( } } } +} + +public class TestConnectionReuse { + @Nested + class Tcp extends AbstractTestConnectionReuse { + public Tcp() { + super(URIScheme.HTTP, ClientProtocolLevel.STANDARD, false); + } + } + + @Nested + class Uds extends AbstractTestConnectionReuse { + public Uds() { + super(URIScheme.HTTP, ClientProtocolLevel.STANDARD, true); + } + @Test + void testMixedTcpAndUdsConnectionPooling() throws Exception { + configureServer(bootstrap -> bootstrap + .register("/random/*", new RandomHandler())); + final HttpHost target = startServer(); + + final TestClient client = client(); + final PoolingHttpClientConnectionManager connManager = client.getConnectionManager(); + connManager.setMaxTotal(5); + connManager.setDefaultMaxPerRoute(5); + + final URI requestUri = new URI("/random/2000"); + final HttpGet udsRequest = new HttpGet(requestUri); + final HttpGet tcpRequest = new HttpGet(requestUri); + tcpRequest.setConfig(RequestConfig.DEFAULT); + + final HttpClientResponseHandler consumer = response -> { + EntityUtils.consume(response.getEntity()); + return null; + }; + client.execute(target, udsRequest, consumer); + client.execute(target, tcpRequest, consumer); + + // Expect leased connections to be returned + Assertions.assertEquals(0, connManager.getTotalStats().getLeased()); + // Expect two connections in the pool (one UDS, one TCP) + Assertions.assertEquals(2, connManager.getTotalStats().getAvailable()); + Assertions.assertEquals(1, connManager.getRoutes() + .stream() + .filter(route -> route.getUnixDomainSocket() != null) + .count()); + } + } } + diff --git a/httpclient5/pom.xml b/httpclient5/pom.xml index 2cd51e18f2..e8b4b57957 100644 --- a/httpclient5/pom.xml +++ b/httpclient5/pom.xml @@ -82,6 +82,12 @@ dec true + + com.kohlschutter.junixsocket + junixsocket-core + test + pom + org.junit.jupiter junit-jupiter @@ -185,4 +191,4 @@ - \ No newline at end of file + diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/HttpRoute.java b/httpclient5/src/main/java/org/apache/hc/client5/http/HttpRoute.java index 2edca7820f..02d6418d65 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/HttpRoute.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/HttpRoute.java @@ -29,6 +29,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -65,6 +66,9 @@ public final class HttpRoute implements RouteInfo, Cloneable { /** The proxy servers, if any. Never null. */ private final List proxyChain; + /** The path to the UDS to connect to, if any. */ + private final Path unixDomainSocket; + /** Whether the the route is tunnelled through the proxy. */ private final TunnelType tunnelled; @@ -78,6 +82,7 @@ public final class HttpRoute implements RouteInfo, Cloneable { final NamedEndpoint targetName, final InetAddress local, final List proxies, + final Path unixDomainSocket, final boolean secure, final TunnelType tunnelled, final LayerType layered) { @@ -86,6 +91,7 @@ public final class HttpRoute implements RouteInfo, Cloneable { this.targetName = targetName; this.targetHost = targetHost; this.localAddress = local; + this.unixDomainSocket = unixDomainSocket; if (proxies != null && !proxies.isEmpty()) { this.proxyChain = new ArrayList<>(proxies); } else { @@ -97,6 +103,23 @@ public final class HttpRoute implements RouteInfo, Cloneable { this.secure = secure; this.tunnelled = tunnelled != null ? tunnelled : TunnelType.PLAIN; this.layered = layered != null ? layered : LayerType.PLAIN; + if (this.unixDomainSocket != null) { + validateUdsArguments(); + } + } + + private void validateUdsArguments() { + if (this.secure) { + throw new UnsupportedOperationException("HTTPS is not supported over a UDS connection"); + } else if (this.localAddress != null) { + throw new UnsupportedOperationException("A localAddress cannot be specified for a UDS connection"); + } else if (this.proxyChain != null) { + throw new UnsupportedOperationException("Proxies are not supported over a UDS connection"); + } else if (this.layered != LayerType.PLAIN) { + throw new UnsupportedOperationException("Layering is not supported over a UDS connection"); + } else if (this.tunnelled != TunnelType.PLAIN) { + throw new UnsupportedOperationException("Tunnelling is not supported over a UDS connection"); + } } /** @@ -114,7 +137,7 @@ public final class HttpRoute implements RouteInfo, Cloneable { */ public HttpRoute(final HttpHost target, final InetAddress local, final HttpHost[] proxies, final boolean secure, final TunnelType tunnelled, final LayerType layered) { - this(target, null, local, proxies != null ? Arrays.asList(proxies) : null, + this(target, null, local, proxies != null ? Arrays.asList(proxies) : null, null, secure, tunnelled, layered); } @@ -136,7 +159,7 @@ public HttpRoute(final HttpHost target, final InetAddress local, final HttpHost[ */ public HttpRoute(final HttpHost target, final NamedEndpoint targetName, final InetAddress local, final HttpHost[] proxies, final boolean secure, final TunnelType tunnelled, final LayerType layered) { - this(target, targetName, local, proxies != null ? Arrays.asList(proxies) : null, + this(target, targetName, local, proxies != null ? Arrays.asList(proxies) : null, null, secure, tunnelled, layered); } @@ -159,7 +182,7 @@ public HttpRoute(final HttpHost target, final NamedEndpoint targetName, final In */ public HttpRoute(final HttpHost target, final InetAddress local, final HttpHost proxy, final boolean secure, final TunnelType tunnelled, final LayerType layered) { - this(target, null, local, proxy != null ? Collections.singletonList(proxy) : null, + this(target, null, local, proxy != null ? Collections.singletonList(proxy) : null, null, secure, tunnelled, layered); } @@ -174,7 +197,7 @@ public HttpRoute(final HttpHost target, final InetAddress local, final HttpHost * {@code false} otherwise */ public HttpRoute(final HttpHost target, final InetAddress local, final boolean secure) { - this(target, null, local, Collections.emptyList(), secure, TunnelType.PLAIN, LayerType.PLAIN); + this(target, null, local, Collections.emptyList(), null, secure, TunnelType.PLAIN, LayerType.PLAIN); } /** @@ -190,7 +213,19 @@ public HttpRoute(final HttpHost target, final InetAddress local, final boolean s * @since 5.4 */ public HttpRoute(final HttpHost target, final NamedEndpoint targetName, final InetAddress local, final boolean secure) { - this(target, targetName, local, Collections.emptyList(), secure, TunnelType.PLAIN, LayerType.PLAIN); + this(target, targetName, local, Collections.emptyList(), null, secure, TunnelType.PLAIN, LayerType.PLAIN); + } + + /** + * Creates a new direct route that connects over a Unix domain socket rather than TCP. + * + * @param target the host to which to route + * @param unixDomainSocket the path to the Unix domain socket + * + * @since 5.6 + */ + public HttpRoute(final HttpHost target, final Path unixDomainSocket) { + this(target, null, null, Collections.emptyList(), unixDomainSocket, false, TunnelType.PLAIN, LayerType.PLAIN); } /** @@ -199,7 +234,7 @@ public HttpRoute(final HttpHost target, final NamedEndpoint targetName, final In * @param target the host to which to route */ public HttpRoute(final HttpHost target) { - this(target, null, null, Collections.emptyList(), false, TunnelType.PLAIN, LayerType.PLAIN); + this(target, null, null, Collections.emptyList(), null, false, TunnelType.PLAIN, LayerType.PLAIN); } /** @@ -220,7 +255,7 @@ public HttpRoute(final HttpHost target) { */ public HttpRoute(final HttpHost target, final NamedEndpoint targetName, final InetAddress local, final HttpHost proxy, final boolean secure) { - this(target, targetName, local, Collections.singletonList(Args.notNull(proxy, "Proxy host")), secure, + this(target, targetName, local, Collections.singletonList(Args.notNull(proxy, "Proxy host")), null, secure, secure ? TunnelType.TUNNELLED : TunnelType.PLAIN, secure ? LayerType.LAYERED : LayerType.PLAIN); } @@ -240,7 +275,7 @@ public HttpRoute(final HttpHost target, final NamedEndpoint targetName, final In */ public HttpRoute(final HttpHost target, final InetAddress local, final HttpHost proxy, final boolean secure) { - this(target, null, local, Collections.singletonList(Args.notNull(proxy, "Proxy host")), secure, + this(target, null, local, Collections.singletonList(Args.notNull(proxy, "Proxy host")), null, secure, secure ? TunnelType.TUNNELLED : TunnelType.PLAIN, secure ? LayerType.LAYERED : LayerType.PLAIN); } @@ -300,6 +335,11 @@ public HttpHost getProxyHost() { return proxyChain != null && !this.proxyChain.isEmpty() ? this.proxyChain.get(0) : null; } + @Override + public Path getUnixDomainSocket() { + return this.unixDomainSocket; + } + @Override public TunnelType getTunnelType() { return this.tunnelled; @@ -348,7 +388,8 @@ public boolean equals(final Object obj) { Objects.equals(this.targetHost, that.targetHost) && Objects.equals(this.targetName, that.targetName) && Objects.equals(this.localAddress, that.localAddress) && - Objects.equals(this.proxyChain, that.proxyChain); + Objects.equals(this.proxyChain, that.proxyChain) && + Objects.equals(this.unixDomainSocket, that.unixDomainSocket); } return false; } @@ -370,6 +411,9 @@ public int hashCode() { hash = LangUtils.hashCode(hash, element); } } + if (this.unixDomainSocket != null) { + hash = LangUtils.hashCode(hash, unixDomainSocket); + } hash = LangUtils.hashCode(hash, this.secure); hash = LangUtils.hashCode(hash, this.tunnelled); hash = LangUtils.hashCode(hash, this.layered); @@ -387,6 +431,8 @@ public String toString() { if (this.localAddress != null) { cab.append(this.localAddress); cab.append("->"); + } else if (unixDomainSocket != null) { + cab.append(unixDomainSocket).append("->"); } cab.append('{'); if (this.tunnelled == TunnelType.TUNNELLED) { diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/RouteInfo.java b/httpclient5/src/main/java/org/apache/hc/client5/http/RouteInfo.java index 4d3ba116a5..e310bd0684 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/RouteInfo.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/RouteInfo.java @@ -28,6 +28,7 @@ package org.apache.hc.client5.http; import java.net.InetAddress; +import java.nio.file.Path; import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.net.NamedEndpoint; @@ -89,6 +90,15 @@ default NamedEndpoint getTargetName() { */ InetAddress getLocalAddress(); + /** + * Obtains the Unix domain socket through which to connect to the target host. + * + * @return the path to the Unix domain socket, or {@code null} if none + */ + default Path getUnixDomainSocket() { + return null; + } + /** * Obtains the number of hops in this route. * A direct route has one hop. A route through a proxy has two hops. diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/RouteTracker.java b/httpclient5/src/main/java/org/apache/hc/client5/http/RouteTracker.java index 4857bb465c..173d941c07 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/RouteTracker.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/RouteTracker.java @@ -28,6 +28,7 @@ package org.apache.hc.client5.http; import java.net.InetAddress; +import java.nio.file.Path; import java.util.Objects; import org.apache.hc.core5.http.HttpHost; @@ -51,6 +52,9 @@ public final class RouteTracker implements RouteInfo, Cloneable { */ private final InetAddress localAddress; + /** The Unix domain socket to connect through, if any. */ + private final Path unixDomainSocket; + // the attributes above are fixed at construction time // now follow attributes that indicate the established route @@ -78,9 +82,24 @@ public final class RouteTracker implements RouteInfo, Cloneable { * {@code null} for the default */ public RouteTracker(final HttpHost target, final InetAddress local) { + this(target, local, null); + } + + /** + * Creates a new route tracker. + * The target and origin need to be specified at creation time. + * + * @param target the host to which to route + * @param local the local address to route from, or + * {@code null} for the default + * @param unixDomainSocket the path to the Unix domain socket + * through which to connect, or {@code null} + */ + public RouteTracker(final HttpHost target, final InetAddress local, final Path unixDomainSocket) { Args.notNull(target, "Target host"); this.targetHost = target; this.localAddress = local; + this.unixDomainSocket = unixDomainSocket; this.tunnelled = TunnelType.PLAIN; this.layered = LayerType.PLAIN; } @@ -104,7 +123,7 @@ public void reset() { * @param route the route to track */ public RouteTracker(final HttpRoute route) { - this(route.getTargetHost(), route.getLocalAddress()); + this(route.getTargetHost(), route.getLocalAddress(), route.getUnixDomainSocket()); } /** @@ -194,6 +213,11 @@ public InetAddress getLocalAddress() { return this.localAddress; } + @Override + public Path getUnixDomainSocket() { + return this.unixDomainSocket; + } + @Override public int getHopCount() { int hops = 0; @@ -265,10 +289,15 @@ public boolean isSecure() { * {@code null} if nothing has been tracked so far */ public HttpRoute toRoute() { - return !this.connected ? - null : new HttpRoute(this.targetHost, this.localAddress, - this.proxyChain, this.secure, - this.tunnelled, this.layered); + if (!this.connected) { + return null; + } else if (this.unixDomainSocket != null) { + return new HttpRoute(this.targetHost, this.unixDomainSocket); + } else { + return new HttpRoute(this.targetHost, this.localAddress, + this.proxyChain, this.secure, + this.tunnelled, this.layered); + } } /** @@ -297,6 +326,7 @@ public boolean equals(final Object o) { this.layered == that.layered && Objects.equals(this.targetHost, that.targetHost) && Objects.equals(this.localAddress, that.localAddress) && + Objects.equals(this.unixDomainSocket, that.unixDomainSocket) && Objects.equals(this.proxyChain, that.proxyChain); } @@ -313,6 +343,7 @@ public int hashCode() { int hash = LangUtils.HASH_SEED; hash = LangUtils.hashCode(hash, this.targetHost); hash = LangUtils.hashCode(hash, this.localAddress); + hash = LangUtils.hashCode(hash, this.unixDomainSocket); if (this.proxyChain != null) { for (final HttpHost element : this.proxyChain) { hash = LangUtils.hashCode(hash, element); @@ -338,6 +369,8 @@ public String toString() { if (this.localAddress != null) { cab.append(this.localAddress); cab.append("->"); + } else if (this.unixDomainSocket != null) { + cab.append(this.unixDomainSocket).append("->"); } cab.append('{'); if (this.connected) { diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/config/RequestConfig.java b/httpclient5/src/main/java/org/apache/hc/client5/http/config/RequestConfig.java index d0d70fb482..54ce77634d 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/config/RequestConfig.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/config/RequestConfig.java @@ -27,6 +27,7 @@ package org.apache.hc.client5.http.config; +import java.nio.file.Path; import java.util.Collection; import java.util.concurrent.TimeUnit; @@ -63,13 +64,14 @@ public class RequestConfig implements Cloneable { private final boolean contentCompressionEnabled; private final boolean hardCancellationEnabled; private final boolean protocolUpgradeEnabled; + private final Path unixDomainSocket; /** * Intended for CDI compatibility */ protected RequestConfig() { this(false, null, null, false, false, 0, false, null, null, - DEFAULT_CONNECTION_REQUEST_TIMEOUT, null, null, DEFAULT_CONN_KEEP_ALIVE, false, false, false); + DEFAULT_CONNECTION_REQUEST_TIMEOUT, null, null, DEFAULT_CONN_KEEP_ALIVE, false, false, false, null); } RequestConfig( @@ -88,7 +90,8 @@ protected RequestConfig() { final TimeValue connectionKeepAlive, final boolean contentCompressionEnabled, final boolean hardCancellationEnabled, - final boolean protocolUpgradeEnabled) { + final boolean protocolUpgradeEnabled, + final Path unixDomainSocket) { super(); this.expectContinueEnabled = expectContinueEnabled; this.proxy = proxy; @@ -106,6 +109,7 @@ protected RequestConfig() { this.contentCompressionEnabled = contentCompressionEnabled; this.hardCancellationEnabled = hardCancellationEnabled; this.protocolUpgradeEnabled = protocolUpgradeEnabled; + this.unixDomainSocket = unixDomainSocket; } /** @@ -227,6 +231,13 @@ public boolean isProtocolUpgradeEnabled() { return protocolUpgradeEnabled; } + /** + * @see Builder#setUnixDomainSocket(Path) + */ + public Path getUnixDomainSocket() { + return unixDomainSocket; + } + @Override protected RequestConfig clone() throws CloneNotSupportedException { return (RequestConfig) super.clone(); @@ -252,6 +263,7 @@ public String toString() { builder.append(", contentCompressionEnabled=").append(contentCompressionEnabled); builder.append(", hardCancellationEnabled=").append(hardCancellationEnabled); builder.append(", protocolUpgradeEnabled=").append(protocolUpgradeEnabled); + builder.append(", unixDomainSocket=").append(unixDomainSocket); builder.append("]"); return builder.toString(); } @@ -277,7 +289,8 @@ public static RequestConfig.Builder copy(final RequestConfig config) { .setConnectionKeepAlive(config.getConnectionKeepAlive()) .setContentCompressionEnabled(config.isContentCompressionEnabled()) .setHardCancellationEnabled(config.isHardCancellationEnabled()) - .setProtocolUpgradeEnabled(config.isProtocolUpgradeEnabled()); + .setProtocolUpgradeEnabled(config.isProtocolUpgradeEnabled()) + .setUnixDomainSocket(config.getUnixDomainSocket()); } public static class Builder { @@ -298,6 +311,7 @@ public static class Builder { private boolean contentCompressionEnabled; private boolean hardCancellationEnabled; private boolean protocolUpgradeEnabled; + private Path unixDomainSocket; Builder() { super(); @@ -629,6 +643,25 @@ public Builder setProtocolUpgradeEnabled(final boolean protocolUpgradeEnabled) { return this; } + /** + * Sets the Unix Domain Socket path to use for the connection. + *

+ * When set, the connection will use the specified Unix Domain Socket + * instead of a TCP socket. This is useful for communicating with local + * services like Docker or systemd. + *

+ *

+ * Default: {@code null} + *

+ * + * @return this instance. + * @since 5.6 + */ + public Builder setUnixDomainSocket(final Path unixDomainSocket) { + this.unixDomainSocket = unixDomainSocket; + return this; + } + public RequestConfig build() { return new RequestConfig( expectContinueEnabled, @@ -646,7 +679,8 @@ public RequestConfig build() { connectionKeepAlive != null ? connectionKeepAlive : DEFAULT_CONN_KEEP_ALIVE, contentCompressionEnabled, hardCancellationEnabled, - protocolUpgradeEnabled); + protocolUpgradeEnabled, + unixDomainSocket); } } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/DefaultHttpClientConnectionOperator.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/DefaultHttpClientConnectionOperator.java index f81bd797db..bb764c9533 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/DefaultHttpClientConnectionOperator.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/DefaultHttpClientConnectionOperator.java @@ -31,6 +31,8 @@ import java.net.Proxy; import java.net.Socket; import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.file.Path; import java.util.Collections; import java.util.List; @@ -46,6 +48,7 @@ import org.apache.hc.client5.http.io.DetachedSocketFactory; import org.apache.hc.client5.http.io.HttpClientConnectionOperator; import org.apache.hc.client5.http.io.ManagedHttpClientConnection; +import org.apache.hc.client5.http.socket.UnixDomainSocketFactory; import org.apache.hc.client5.http.ssl.TlsSocketStrategy; import org.apache.hc.core5.annotation.Contract; import org.apache.hc.core5.annotation.Internal; @@ -80,6 +83,7 @@ public class DefaultHttpClientConnectionOperator implements HttpClientConnection static final DetachedSocketFactory PLAIN_SOCKET_FACTORY = socksProxy -> socksProxy == null ? new Socket() : new Socket(socksProxy); private final DetachedSocketFactory detachedSocketFactory; + private final UnixDomainSocketFactory unixDomainSocketFactory = UnixDomainSocketFactory.getSocketFactory(); private final Lookup tlsSocketStrategyLookup; private final SchemePortResolver schemePortResolver; private final DnsResolver dnsResolver; @@ -153,6 +157,21 @@ public void connect( final SocketConfig socketConfig, final Object attachment, final HttpContext context) throws IOException { + connect(conn, endpointHost, endpointName, null, localAddress, connectTimeout, socketConfig, attachment, + context); + } + + @Override + public void connect( + final ManagedHttpClientConnection conn, + final HttpHost endpointHost, + final NamedEndpoint endpointName, + final Path unixDomainSocket, + final InetSocketAddress localAddress, + final Timeout connectTimeout, + final SocketConfig socketConfig, + final Object attachment, + final HttpContext context) throws IOException { Args.notNull(conn, "Connection"); Args.notNull(endpointHost, "Host"); @@ -162,6 +181,10 @@ public void connect( final Timeout soTimeout = socketConfig.getSoTimeout(); final SocketAddress socksProxyAddress = socketConfig.getSocksProxyAddress(); final Proxy socksProxy = socksProxyAddress != null ? new Proxy(Proxy.Type.SOCKS, socksProxyAddress) : null; + if (unixDomainSocket != null) { + connectToUnixDomainSocket(conn, endpointHost, unixDomainSocket, connectTimeout, socketConfig, context, soTimeout); + return; + } final List remoteAddresses; if (endpointHost.getAddress() != null) { @@ -185,23 +208,7 @@ public void connect( socket.bind(localAddress); } conn.bind(socket); - if (soTimeout != null) { - socket.setSoTimeout(soTimeout.toMillisecondsIntBound()); - } - socket.setReuseAddress(socketConfig.isSoReuseAddress()); - socket.setTcpNoDelay(socketConfig.isTcpNoDelay()); - socket.setKeepAlive(socketConfig.isSoKeepAlive()); - if (socketConfig.getRcvBufSize() > 0) { - socket.setReceiveBufferSize(socketConfig.getRcvBufSize()); - } - if (socketConfig.getSndBufSize() > 0) { - socket.setSendBufferSize(socketConfig.getSndBufSize()); - } - - final int linger = socketConfig.getSoLinger().toMillisecondsIntBound(); - if (linger >= 0) { - socket.setSoLinger(true, linger); - } + configureSocket(socket, socketConfig, soTimeout); socket.connect(remoteAddress, TimeValue.isPositive(connectTimeout) ? connectTimeout.toMillisecondsIntBound() : 0); conn.bind(socket); onAfterSocketConnect(context, endpointHost); @@ -242,6 +249,64 @@ public void connect( } } + private void connectToUnixDomainSocket( + final ManagedHttpClientConnection conn, + final HttpHost endpointHost, + final Path unixDomainSocket, + final Timeout connectTimeout, + final SocketConfig socketConfig, + final HttpContext context, + final Timeout soTimeout) throws IOException { + onBeforeSocketConnect(context, endpointHost); + if (LOG.isDebugEnabled()) { + LOG.debug("{} connecting to {} ({})", endpointHost, unixDomainSocket, connectTimeout); + } + final Socket newSocket = unixDomainSocketFactory.createSocket(); + try { + conn.bind(newSocket); + final Socket socket = unixDomainSocketFactory.connectSocket(newSocket, unixDomainSocket, + connectTimeout); + conn.bind(socket); + configureSocket(socket, socketConfig, soTimeout); + onAfterSocketConnect(context, endpointHost); + if (LOG.isDebugEnabled()) { + LOG.debug("{} {} connected to {}", ConnPoolSupport.getId(conn), endpointHost, unixDomainSocket); + } + conn.setSocketTimeout(soTimeout); + } catch (final RuntimeException ex) { + Closer.closeQuietly(newSocket); + throw ex; + } catch (final IOException ex) { + Closer.closeQuietly(newSocket); + if (LOG.isDebugEnabled()) { + LOG.debug("{} connection to {} failed ({}); terminating operation", endpointHost, unixDomainSocket, + ex.getClass()); + } + throw ex; + } + } + + private static void configureSocket(final Socket socket, final SocketConfig socketConfig, + final Timeout soTimeout) throws SocketException { + if (soTimeout != null) { + socket.setSoTimeout(soTimeout.toMillisecondsIntBound()); + } + socket.setReuseAddress(socketConfig.isSoReuseAddress()); + socket.setTcpNoDelay(socketConfig.isTcpNoDelay()); + socket.setKeepAlive(socketConfig.isSoKeepAlive()); + if (socketConfig.getRcvBufSize() > 0) { + socket.setReceiveBufferSize(socketConfig.getRcvBufSize()); + } + if (socketConfig.getSndBufSize() > 0) { + socket.setSendBufferSize(socketConfig.getSndBufSize()); + } + + final int linger = socketConfig.getSoLinger().toMillisecondsIntBound(); + if (linger >= 0) { + socket.setSoLinger(true, linger); + } + } + @Override public void upgrade( final ManagedHttpClientConnection conn, diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java index 8d509209a5..f43296767f 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java @@ -27,6 +27,7 @@ package org.apache.hc.client5.http.impl.io; import java.io.IOException; +import java.nio.file.Path; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -479,6 +480,7 @@ public void connect(final ConnectionEndpoint endpoint, final TimeValue timeout, poolEntry.assignConnection(connFactory.createConnection(null)); } final HttpRoute route = poolEntry.getRoute(); + final Path unixDomainSocket = route.getUnixDomainSocket(); final HttpHost firstHop = route.getProxyHost() != null ? route.getProxyHost() : route.getTargetHost(); final SocketConfig socketConfig = resolveSocketConfig(route); final ConnectionConfig connectionConfig = resolveConnectionConfig(route); @@ -491,6 +493,7 @@ public void connect(final ConnectionEndpoint endpoint, final TimeValue timeout, conn, firstHop, route.getTargetName(), + unixDomainSocket, route.getLocalSocketAddress(), connectTimeout, socketConfig, diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/DefaultAsyncClientConnectionOperator.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/DefaultAsyncClientConnectionOperator.java index 1d3531373f..28515675e7 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/DefaultAsyncClientConnectionOperator.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/DefaultAsyncClientConnectionOperator.java @@ -27,9 +27,11 @@ package org.apache.hc.client5.http.impl.nio; +import java.lang.reflect.Method; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.nio.file.Path; import java.util.concurrent.Future; import org.apache.hc.client5.http.DnsResolver; @@ -90,10 +92,25 @@ public Future connect( attachment, null, callback); } + @Override + public Future connect( + final ConnectionInitiator connectionInitiator, + final HttpHost endpointHost, + final NamedEndpoint endpointName, + final SocketAddress localAddress, + final Timeout connectTimeout, + final Object attachment, + final HttpContext context, + final FutureCallback callback) { + return connect(connectionInitiator, endpointHost, null, endpointName, localAddress, connectTimeout, attachment, + context, callback); + } + @Override public Future connect( final ConnectionInitiator connectionInitiator, final HttpHost endpointHost, + final Path unixDomainSocket, final NamedEndpoint endpointName, final SocketAddress localAddress, final Timeout connectTimeout, @@ -104,7 +121,14 @@ public Future connect( Args.notNull(endpointHost, "Host"); final ComplexFuture future = new ComplexFuture<>(callback); final HttpHost remoteEndpoint = RoutingSupport.normalize(endpointHost, schemePortResolver); - final InetAddress remoteAddress = endpointHost.getAddress(); + final SocketAddress remoteAddress; + if (unixDomainSocket == null) { + final InetAddress remoteInetAddress = endpointHost.getAddress(); + remoteAddress = remoteInetAddress != null ? + new InetSocketAddress(remoteInetAddress, remoteEndpoint.getPort()) : null; + } else { + remoteAddress = createUnixSocketAddress(unixDomainSocket); + } final TlsConfig tlsConfig = attachment instanceof TlsConfig ? (TlsConfig) attachment : TlsConfig.DEFAULT; onBeforeSocketConnect(context, endpointHost); @@ -115,7 +139,7 @@ public Future connect( final Future sessionFuture = sessionRequester.connect( connectionInitiator, remoteEndpoint, - remoteAddress != null ? new InetSocketAddress(remoteAddress, remoteEndpoint.getPort()) : null, + remoteAddress, localAddress, connectTimeout, tlsConfig.getHttpVersionPolicy(), @@ -180,6 +204,18 @@ public void cancelled() { return future; } + // The IOReactor does not support AFUNIXSocketChannel from JUnixSocket, so if a Unix domain socket was configured, + // we must use JEP 380 sockets and addresses. + private static SocketAddress createUnixSocketAddress(final Path socketPath) { + try { + final Class addressClass = Class.forName("java.net.UnixDomainSocketAddress"); + final Method ofMethod = addressClass.getMethod("of", Path.class); + return (SocketAddress) ofMethod.invoke(null, socketPath); + } catch (final ReflectiveOperationException ex) { + throw new UnsupportedOperationException("Async Unix domain socket support requires Java 16 or later", ex); + } + } + @Override public void upgrade( final ManagedAsyncClientConnection connection, diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java index cd5473f2f1..35fc663266 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java @@ -27,6 +27,7 @@ package org.apache.hc.client5.http.impl.nio; +import java.nio.file.Path; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -446,6 +447,7 @@ public Future connect( } final PoolEntry poolEntry = internalEndpoint.getPoolEntry(); final HttpRoute route = poolEntry.getRoute(); + final Path unixDomainSocket = route.getUnixDomainSocket(); final HttpHost firstHop = route.getProxyHost() != null ? route.getProxyHost() : route.getTargetHost(); final ConnectionConfig connectionConfig = resolveConnectionConfig(route); final Timeout connectTimeout = timeout != null ? timeout : connectionConfig.getConnectTimeout(); @@ -456,6 +458,7 @@ public Future connect( final Future connectFuture = connectionOperator.connect( connectionInitiator, firstHop, + unixDomainSocket, route.getTargetName(), route.getLocalSocketAddress(), connectTimeout, diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/routing/DefaultRoutePlanner.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/routing/DefaultRoutePlanner.java index 076a750a7d..578fa55a97 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/routing/DefaultRoutePlanner.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/routing/DefaultRoutePlanner.java @@ -28,6 +28,7 @@ package org.apache.hc.client5.http.impl.routing; import java.net.InetAddress; +import java.nio.file.Path; import java.util.Objects; import org.apache.hc.client5.http.HttpRoute; @@ -94,8 +95,17 @@ public final HttpRoute determineRoute(final HttpHost host, final HttpRequest req } else { authority = null; } - final InetAddress inetAddress = determineLocalAddress(target, context); + final Path unixDomainSocket = config.getUnixDomainSocket(); + if (unixDomainSocket != null) { + if (proxy != null) { + throw new UnsupportedOperationException("Proxies are not supported over Unix domain sockets"); + } else if (secure) { + throw new UnsupportedOperationException("HTTPS is not supported over Unix domain sockets"); + } + return new HttpRoute(target, unixDomainSocket); + } + final InetAddress inetAddress = determineLocalAddress(target, context); if (proxy == null) { return new HttpRoute(target, authority, inetAddress, secure); } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/io/HttpClientConnectionOperator.java b/httpclient5/src/main/java/org/apache/hc/client5/http/io/HttpClientConnectionOperator.java index 5489c6ac68..2685accc9b 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/io/HttpClientConnectionOperator.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/io/HttpClientConnectionOperator.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.nio.file.Path; import org.apache.hc.core5.annotation.Contract; import org.apache.hc.core5.annotation.Internal; @@ -94,6 +95,38 @@ default void connect( connect(conn, endpointHost, localAddress, connectTimeout, socketConfig, context); } + /** + * Connect the given managed connection to the remote endpoint. + * + * @param conn the managed connection. + * @param endpointHost the address of the remote endpoint. + * @param unixDomainSocket the path to the Unix domain socket, or {@code null} if none. + * @param endpointName the name of the remote endpoint, if different from the endpoint host name, + * {@code null} otherwise. Usually taken from the request URU authority. + * @param localAddress the address of the local endpoint. + * @param connectTimeout the timeout of the connect operation. + * @param socketConfig the socket configuration. + * @param attachment connect request attachment. + * @param context the execution context. + * + * @since 5.6 + */ + default void connect( + ManagedHttpClientConnection conn, + HttpHost endpointHost, + NamedEndpoint endpointName, + Path unixDomainSocket, + InetSocketAddress localAddress, + Timeout connectTimeout, + SocketConfig socketConfig, + Object attachment, + HttpContext context) throws IOException { + if (unixDomainSocket != null) { + throw new UnsupportedOperationException(getClass().getName() + " does not support Unix domain sockets"); + } + connect(conn, endpointHost, localAddress, connectTimeout, socketConfig, context); + } + /** * Upgrades transport security of the given managed connection * by using the TLS security protocol. @@ -128,5 +161,4 @@ default void upgrade( HttpContext context) throws IOException { upgrade(conn, endpointHost, context); } - } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncClientConnectionOperator.java b/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncClientConnectionOperator.java index 30861492e0..f8c5c95022 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncClientConnectionOperator.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncClientConnectionOperator.java @@ -28,6 +28,7 @@ package org.apache.hc.client5.http.nio; import java.net.SocketAddress; +import java.nio.file.Path; import java.util.concurrent.Future; import org.apache.hc.core5.annotation.Contract; @@ -98,6 +99,40 @@ default Future connect( attachment, callback); } + /** + * Initiates operation to create a connection to the remote endpoint using + * the provided {@link ConnectionInitiator}. + * + * @param connectionInitiator the connection initiator. + * @param endpointHost the address of the remote endpoint. + * @param unixDomainSocket the path to the UDS through which to connect, or {@code null}. + * @param endpointName the name of the remote endpoint, if different from the endpoint host name, + * {@code null} otherwise. Usually taken from the request URU authority. + * @param localAddress the address of the local endpoint. + * @param connectTimeout the timeout of the connect operation. + * @param attachment the attachment, which can be any object representing custom parameter + * of the operation. + * @param context the execution context. + * @param callback the future result callback. + * @since 5.6 + */ + default Future connect( + ConnectionInitiator connectionInitiator, + HttpHost endpointHost, + Path unixDomainSocket, + NamedEndpoint endpointName, + SocketAddress localAddress, + Timeout connectTimeout, + Object attachment, + HttpContext context, + FutureCallback callback) { + if (unixDomainSocket != null) { + throw new UnsupportedOperationException(getClass().getName() + " does not support Unix domain sockets"); + } + return connect(connectionInitiator, endpointHost, localAddress, connectTimeout, + attachment, callback); + } + /** * Upgrades transport security of the given managed connection * by using the TLS security protocol. diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/socket/UnixDomainSocketFactory.java b/httpclient5/src/main/java/org/apache/hc/client5/http/socket/UnixDomainSocketFactory.java new file mode 100644 index 0000000000..bd8a4a2d92 --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/socket/UnixDomainSocketFactory.java @@ -0,0 +1,179 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.client5.http.socket; + +import org.apache.hc.core5.annotation.Contract; +import org.apache.hc.core5.annotation.Internal; +import org.apache.hc.core5.annotation.ThreadingBehavior; +import org.apache.hc.core5.util.Args; +import org.apache.hc.core5.util.TimeValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.file.Path; + +/** + * A factory for Unix domain sockets. + *

+ * This implementation supports both the JDK16+ standard library implementation (JEP 380) and the JUnixSocket library. + * It will automatically detect which implementation is available and use it. JUnixSocket is preferred, since it + * supports the {@link java.net.Socket} API used by the classic client. + *

+ * + * @since 5.6 + */ +@Contract(threading = ThreadingBehavior.STATELESS) +@Internal +public final class UnixDomainSocketFactory { + private static final Logger LOG = LoggerFactory.getLogger(UnixDomainSocketFactory.class); + + private static final String JDK_UNIX_SOCKET_ADDRESS_CLASS = "java.net.UnixDomainSocketAddress"; + private static final String JUNIXSOCKET_SOCKET_CLASS = "org.newsclub.net.unix.AFUNIXSocket"; + private static final String JUNIXSOCKET_ADDRESS_CLASS = "org.newsclub.net.unix.AFUNIXSocketAddress"; + + private enum Implementation { + JDK, + JUNIXSOCKET, + NONE + } + + private static final Implementation IMPLEMENTATION = detectImplementation(); + + private static Implementation detectImplementation() { + try { + Class.forName(JUNIXSOCKET_SOCKET_CLASS); + LOG.debug("Using JUnixSocket Unix Domain Socket implementation"); + return Implementation.JUNIXSOCKET; + } catch (final ClassNotFoundException e) { + try { + Class.forName(JDK_UNIX_SOCKET_ADDRESS_CLASS); + LOG.debug("Using JDK Unix Domain Socket implementation"); + return Implementation.JDK; + } catch (final ClassNotFoundException e2) { + LOG.debug("No Unix Domain Socket implementation found"); + return Implementation.NONE; + } + } + } + + /** + * Checks if Unix Domain Socket support is available. + * + * @return true if Unix Domain Socket support is available, false otherwise + */ + public static boolean isAvailable() { + return IMPLEMENTATION != Implementation.NONE; + } + + /** + * Default instance of {@link UnixDomainSocketFactory}. + */ + private static final UnixDomainSocketFactory INSTANCE = new UnixDomainSocketFactory(); + + /** + * Gets the singleton instance of {@link UnixDomainSocketFactory}. + * + * @return the singleton instance + */ + public static UnixDomainSocketFactory getSocketFactory() { + return INSTANCE; + } + + public SocketAddress createSocketAddress(final Path socketPath) { + if (!isAvailable()) { + throw new UnsupportedOperationException("Unix Domain Socket support is not available"); + } + Args.notNull(socketPath, "Unix domain socket path"); + + try { + if (IMPLEMENTATION == Implementation.JDK) { + // JDK implementation + final Class addressClass = Class.forName(JDK_UNIX_SOCKET_ADDRESS_CLASS); + final Method ofMethod = addressClass.getMethod("of", Path.class); + return (SocketAddress) ofMethod.invoke(null, socketPath); + } else { + // JUnixSocket implementation + final Class addressClass = Class.forName(JUNIXSOCKET_ADDRESS_CLASS); + final Method ofMethod = addressClass.getMethod("of", Path.class); + return (SocketAddress) ofMethod.invoke(null, socketPath); + } + } catch (final ReflectiveOperationException ex) { + throw new RuntimeException("Could not create UDS SocketAddress", ex); + } + } + + public Socket createSocket() throws IOException { + if (!isAvailable()) { + throw new UnsupportedOperationException("Unix Domain Socket support is not available"); + } + + try { + if (IMPLEMENTATION == Implementation.JDK) { + // Java 16+ only supports UDS through the SocketChannel API, but the sync client is coupled + // to the legacy Socket API. In order to use Java sockets, we first need to write an + // adapter, similar to the one provided by JUnixSocket. + throw new UnsupportedOperationException("JEP 380 Unix domain sockets are not supported; use " + + "JUnixSocket"); + } else { + // JUnixSocket implementation + final Class socketClass = Class.forName(JUNIXSOCKET_SOCKET_CLASS); + final Method newInstanceMethod = socketClass.getMethod("newInstance"); + return (Socket) newInstanceMethod.invoke(null); + } + } catch (final Exception e) { + throw new IOException("Failed to create Unix domain socket", e); + } + } + + public Socket connectSocket( + final Socket socket, + final Path socketPath, + final TimeValue connectTimeout + ) throws IOException { + Args.notNull(socketPath, "Unix domain socket path"); + + final Socket sock = socket != null ? socket : createSocket(); + final SocketAddress address = createSocketAddress(socketPath); + final int connTimeoutMs = TimeValue.isPositive(connectTimeout) ? connectTimeout.toMillisecondsIntBound() : 0; + + try { + sock.connect(address, connTimeoutMs); + return sock; + } catch (final IOException ex) { + try { + sock.close(); + } catch (final IOException ignore) { + } + throw ex; + } + } +} diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/TestHttpRoute.java b/httpclient5/src/test/java/org/apache/hc/client5/http/TestHttpRoute.java index 7be5443057..35d49354e2 100644 --- a/httpclient5/src/test/java/org/apache/hc/client5/http/TestHttpRoute.java +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/TestHttpRoute.java @@ -28,7 +28,11 @@ package org.apache.hc.client5.http; import java.net.InetAddress; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Set; import org.apache.hc.client5.http.RouteInfo.LayerType; @@ -454,4 +458,44 @@ void testImmutable() throws CloneNotSupportedException { Assertions.assertEquals(route3, route1, "route was modified"); } + @Test + void testUnixDomainSocketModeling() { + final Path uds1 = Paths.get("/var/run/docker.sock"); + final HttpRoute route1 = new HttpRoute(TARGET1, uds1); + + Assertions.assertEquals(uds1, route1.getUnixDomainSocket()); + Assertions.assertEquals(1, route1.getHopCount(), "A UDS is not considered a proxy hop"); + Assertions.assertNull(route1.getProxyHost(), "A UDS is not considered a proxy for routing purposes"); + Assertions.assertEquals("/var/run/docker.sock->{}->[http://target1.test.invalid:80]", route1.toString()); + + final Path uds2 = Paths.get("/var/run/docker.sock"); + final HttpRoute route2 = new HttpRoute(TARGET1, null, null, Collections.emptyList(), uds2, false, + TunnelType.PLAIN, LayerType.PLAIN); + final HttpRoute route3 = new HttpRoute(TARGET1, null, null, Collections.emptyList(), null, false, + TunnelType.PLAIN, LayerType.PLAIN); + + Assertions.assertEquals(route1, route2, + "The UDS convenience constructor should produce an equivalent HttpRoute to the full constructor"); + Assertions.assertNotEquals(route2, route3, "HttpRoute equality should consider the UDS field"); + Assertions.assertNotEquals(route2.hashCode(), route3.hashCode(), + "HttpRoute hashing should consider the UDS field"); + } + + @Test + void testUnixDomainSocketValidation() { + final Path uds = Paths.get("/var/run/docker.sock"); + final List noProxies = Collections.emptyList(); + final List oneProxy = Collections.singletonList(PROXY1); + new HttpRoute(TARGET1, null, null, noProxies, uds, false, TunnelType.PLAIN, LayerType.PLAIN); + Assertions.assertThrows(RuntimeException.class, () -> + new HttpRoute(TARGET1, null, null, noProxies, uds, true, TunnelType.PLAIN, LayerType.PLAIN)); + Assertions.assertThrows(RuntimeException.class, () -> + new HttpRoute(TARGET1, null, LOCAL41, noProxies, uds, false, TunnelType.PLAIN, LayerType.PLAIN)); + Assertions.assertThrows(RuntimeException.class, () -> + new HttpRoute(TARGET1, null, null, oneProxy, uds, false, TunnelType.PLAIN, LayerType.PLAIN)); + Assertions.assertThrows(RuntimeException.class, () -> + new HttpRoute(TARGET1, null, null, noProxies, uds, false, TunnelType.TUNNELLED, LayerType.PLAIN)); + Assertions.assertThrows(RuntimeException.class, () -> + new HttpRoute(TARGET1, null, null, noProxies, uds, false, TunnelType.PLAIN, LayerType.LAYERED)); + } } diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/examples/UnixDomainSocket.java b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/UnixDomainSocket.java new file mode 100644 index 0000000000..7251832b82 --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/UnixDomainSocket.java @@ -0,0 +1,75 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.examples; + +import org.apache.hc.client5.http.classic.methods.HttpGet; +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintStream; +import java.nio.file.Path; + +public class UnixDomainSocket { + public static void main(final String[] args) throws IOException { + if (args.length == 0 || "-h".equals(args[0]) || "--help".equals(args[0])) { + usage(System.out); + return; + } else if (args.length != 2) { + usage(System.err); + return; + } + + final Path unixDomainSocket = new File(args[0]).toPath(); + final String uri = args[1]; + + try (CloseableHttpClient client = HttpClientBuilder.create().build()) { + final HttpGet httpGet = new HttpGet(uri); + httpGet.setConfig(RequestConfig.custom().setUnixDomainSocket(unixDomainSocket).build()); + client.execute(httpGet, classicHttpResponse -> { + final InputStream inputStream = classicHttpResponse.getEntity().getContent(); + final byte[] buf = new byte[8192]; + int len; + while ((len = inputStream.read(buf)) > 0) { + System.out.write(buf, 0, len); + } + return null; + }); + } + } + + private static void usage(final PrintStream printStream) { + printStream.println("Usage: UnixDomainSocket [path] [uri]"); + printStream.println(); + printStream.println("Examples:"); + printStream.println("UnixDomainSocket /var/run/docker.sock 'http://localhost/info'"); + printStream.println("UnixDomainSocket /var/run/docker.sock 'http://localhost/containers/json?all=1'"); + } +} diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/examples/UnixDomainSocketAsync.java b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/UnixDomainSocketAsync.java new file mode 100644 index 0000000000..df6a14596c --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/UnixDomainSocketAsync.java @@ -0,0 +1,93 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.examples; + +import io.reactivex.Observable; +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleRequestProducer; +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.Method; +import org.apache.hc.core5.reactive.ReactiveResponseConsumer; +import org.reactivestreams.Publisher; + +import java.io.File; +import java.io.PrintStream; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class UnixDomainSocketAsync { + public static void main(final String[] args) throws Exception { + if (args.length == 0 || "-h".equals(args[0]) || "--help".equals(args[0])) { + usage(System.out); + return; + } else if (args.length != 2) { + usage(System.err); + return; + } + + final Path unixDomainSocket = new File(args[0]).toPath(); + final String uri = args[1]; + + final RequestConfig requestConfig = RequestConfig.custom().setUnixDomainSocket(unixDomainSocket).build(); + try (CloseableHttpAsyncClient client = HttpAsyncClientBuilder.create() + .setDefaultRequestConfig(requestConfig) + .build()) { + client.start(); + + final SimpleHttpRequest httpGet = SimpleHttpRequest.create(Method.GET.name(), uri); + httpGet.setConfig(requestConfig); + + final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer(); + client.execute(SimpleRequestProducer.create(httpGet), consumer, null).get(10, TimeUnit.SECONDS); + final Message> message = consumer.getResponseFuture() + .get(10, TimeUnit.SECONDS); + final List bufs = Observable.fromPublisher(message.getBody()) + .collectInto(new ArrayList(), List::add) + .blockingGet(); + for (final ByteBuffer buf : bufs) { + final byte[] bytes = new byte[buf.remaining()]; + buf.get(bytes); + System.out.write(bytes); + } + } + } + + private static void usage(final PrintStream printStream) { + printStream.println("Usage: UnixDomainSocketAsync [path] [uri]"); + printStream.println(); + printStream.println("Examples:"); + printStream.println("UnixDomainSocketAsync /var/run/docker.sock 'http://localhost/info'"); + printStream.println("UnixDomainSocketAsync /var/run/docker.sock 'http://localhost/containers/json?all=1'"); + } +} diff --git a/pom.xml b/pom.xml index 03abba507e..6aa02898fb 100644 --- a/pom.xml +++ b/pom.xml @@ -75,6 +75,7 @@ 1 2.2.21 1.21.1 + 2.10.1 5.3 javax.net.ssl.SSLEngine,javax.net.ssl.SSLParameters,java.nio.ByteBuffer,java.nio.CharBuffer @@ -163,6 +164,12 @@ ${rxjava.version} test
+ + com.kohlschutter.junixsocket + junixsocket-core + ${junixsocket.version} + pom + org.junit junit-bom @@ -438,4 +445,4 @@ - \ No newline at end of file +