Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions xds/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ java_proto_library(
"@envoy_api//envoy/extensions/load_balancing_policies/ring_hash/v3:pkg",
"@envoy_api//envoy/extensions/load_balancing_policies/round_robin/v3:pkg",
"@envoy_api//envoy/extensions/load_balancing_policies/wrr_locality/v3:pkg",
"@envoy_api//envoy/extensions/transport_sockets/http_11_proxy/v3:pkg",
"@envoy_api//envoy/extensions/transport_sockets/tls/v3:pkg",
"@envoy_api//envoy/service/discovery/v3:pkg",
"@envoy_api//envoy/service/load_stats/v3:pkg",
Expand Down
4 changes: 3 additions & 1 deletion xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,9 @@ private void handleClusterDiscovered() {
}

ClusterResolverConfig config = new ClusterResolverConfig(
Collections.unmodifiableList(instances), configOrError.getConfig());
Collections.unmodifiableList(instances),
configOrError.getConfig(),
root.result.isHttp11ProxyAvailable());
if (childLb == null) {
childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper);
}
Expand Down
46 changes: 44 additions & 2 deletions xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.protobuf.Struct;
import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
import io.grpc.HttpConnectProxiedSocketAddress;
import io.grpc.InternalLogId;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancerProvider;
Expand Down Expand Up @@ -59,6 +60,8 @@
import io.grpc.xds.client.XdsClient.ResourceWatcher;
import io.grpc.xds.client.XdsLogger;
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
Expand Down Expand Up @@ -430,8 +433,18 @@
.set(XdsAttributes.ATTR_SERVER_WEIGHT, weight)
.set(XdsAttributes.ATTR_ADDRESS_NAME, endpoint.hostname())
.build();
EquivalentAddressGroup eag = new EquivalentAddressGroup(
endpoint.eag().getAddresses(), attr);

EquivalentAddressGroup eag;
if (config.isHttp11ProxyAvailable()) {
List<SocketAddress> rewrittenAddresses = new ArrayList<>();
for (SocketAddress addr : endpoint.eag().getAddresses()) {
rewrittenAddresses.add(rewriteAddress(
addr, endpoint.endpointMetadata(), localityLbInfo.localityMetadata()));
}
eag = new EquivalentAddressGroup(rewrittenAddresses, attr);
} else {
eag = new EquivalentAddressGroup(endpoint.eag().getAddresses(), attr);
}
eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName));
addresses.add(eag);
}
Expand Down Expand Up @@ -469,6 +482,35 @@
new EndpointsUpdated().run();
}

private SocketAddress rewriteAddress(SocketAddress addr,
ImmutableMap<String, Object> endpointMetadata,
ImmutableMap<String, Object> localityMetadata) {
if (!(addr instanceof InetSocketAddress)) {
return addr;

Check warning on line 489 in xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java#L489

Added line #L489 was not covered by tests
}

SocketAddress proxyAddress;
try {
proxyAddress = (SocketAddress) endpointMetadata.get(
"envoy.http11_proxy_transport_socket.proxy_address");
if (proxyAddress == null) {
proxyAddress = (SocketAddress) localityMetadata.get(
"envoy.http11_proxy_transport_socket.proxy_address");
}
} catch (ClassCastException e) {
return addr;

Check warning on line 501 in xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java#L500-L501

Added lines #L500 - L501 were not covered by tests
}

if (proxyAddress == null) {
return addr;

Check warning on line 505 in xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java#L505

Added line #L505 was not covered by tests
}

return HttpConnectProxiedSocketAddress.newBuilder()
.setTargetAddress((InetSocketAddress) addr)
.setProxyAddress(proxyAddress)
.build();
}

private List<String> generatePriorityNames(String name,
Map<Locality, LocalityLbEndpoints> localityLbEndpoints) {
TreeMap<Integer, List<Locality>> todo = new TreeMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,17 @@ static final class ClusterResolverConfig {
final List<DiscoveryMechanism> discoveryMechanisms;
// GracefulSwitch configuration
final Object lbConfig;
private final boolean isHttp11ProxyAvailable;

ClusterResolverConfig(List<DiscoveryMechanism> discoveryMechanisms, Object lbConfig) {
ClusterResolverConfig(List<DiscoveryMechanism> discoveryMechanisms, Object lbConfig,
boolean isHttp11ProxyAvailable) {
this.discoveryMechanisms = checkNotNull(discoveryMechanisms, "discoveryMechanisms");
this.lbConfig = checkNotNull(lbConfig, "lbConfig");
this.isHttp11ProxyAvailable = isHttp11ProxyAvailable;
}

boolean isHttp11ProxyAvailable() {
return isHttp11ProxyAvailable;
}

@Override
Expand Down
20 changes: 13 additions & 7 deletions xds/src/main/java/io/grpc/xds/Endpoints.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.grpc.EquivalentAddressGroup;
import java.net.InetSocketAddress;
import java.util.List;
Expand All @@ -41,11 +42,13 @@ abstract static class LocalityLbEndpoints {
// Locality's priority level.
abstract int priority();

abstract ImmutableMap<String, Object> localityMetadata();

static LocalityLbEndpoints create(List<LbEndpoint> endpoints, int localityWeight,
int priority) {
int priority, ImmutableMap<String, Object> localityMetadata) {
checkArgument(localityWeight > 0, "localityWeight must be greater than 0");
return new AutoValue_Endpoints_LocalityLbEndpoints(
ImmutableList.copyOf(endpoints), localityWeight, priority);
ImmutableList.copyOf(endpoints), localityWeight, priority, localityMetadata);
}
}

Expand All @@ -63,17 +66,20 @@ abstract static class LbEndpoint {

abstract String hostname();

abstract ImmutableMap<String, Object> endpointMetadata();

static LbEndpoint create(EquivalentAddressGroup eag, int loadBalancingWeight,
boolean isHealthy, String hostname) {
return new AutoValue_Endpoints_LbEndpoint(eag, loadBalancingWeight, isHealthy, hostname);
boolean isHealthy, String hostname, ImmutableMap<String, Object> endpointMetadata) {
return new AutoValue_Endpoints_LbEndpoint(
eag, loadBalancingWeight, isHealthy, hostname, endpointMetadata);
}

// Only for testing.
@VisibleForTesting
static LbEndpoint create(
String address, int port, int loadBalancingWeight, boolean isHealthy, String hostname) {
static LbEndpoint create(String address, int port, int loadBalancingWeight, boolean isHealthy,
String hostname, ImmutableMap<String, Object> endpointMetadata) {
return LbEndpoint.create(new EquivalentAddressGroup(new InetSocketAddress(address, port)),
loadBalancingWeight, isHealthy, hostname);
loadBalancingWeight, isHealthy, hostname, endpointMetadata);
}
}

Expand Down
12 changes: 9 additions & 3 deletions xds/src/main/java/io/grpc/xds/GcpAuthenticationFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.grpc.Status;
import io.grpc.auth.MoreCallCredentials;
import io.grpc.xds.MetadataRegistry.MetadataValueParser;
import io.grpc.xds.client.XdsResourceType.ResourceInvalidException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -240,11 +241,16 @@
}

@Override
public String parse(Any any) throws InvalidProtocolBufferException {
Audience audience = any.unpack(Audience.class);
public String parse(Any any) throws ResourceInvalidException {
Audience audience;
try {
audience = any.unpack(Audience.class);
} catch (InvalidProtocolBufferException ex) {
throw new ResourceInvalidException("Invalid Resource in address proto", ex);

Check warning on line 249 in xds/src/main/java/io/grpc/xds/GcpAuthenticationFilter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/GcpAuthenticationFilter.java#L248-L249

Added lines #L248 - L249 were not covered by tests
}
String url = audience.getUrl();
if (url.isEmpty()) {
throw new InvalidProtocolBufferException(
throw new ResourceInvalidException(

Check warning on line 253 in xds/src/main/java/io/grpc/xds/GcpAuthenticationFilter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/GcpAuthenticationFilter.java#L253

Added line #L253 was not covered by tests
"Audience URL is empty. Metadata value must contain a valid URL.");
}
return url;
Expand Down
60 changes: 57 additions & 3 deletions xds/src/main/java/io/grpc/xds/MetadataRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@
package io.grpc.xds;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Struct;
import io.envoyproxy.envoy.config.core.v3.Metadata;
import io.grpc.xds.GcpAuthenticationFilter.AudienceMetadataParser;
import io.grpc.xds.XdsEndpointResource.AddressMetadataParser;
import io.grpc.xds.client.XdsResourceType.ResourceInvalidException;
import io.grpc.xds.internal.ProtobufJsonConverter;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -36,6 +41,7 @@

private MetadataRegistry() {
registerParser(new AudienceMetadataParser());
registerParser(new AddressMetadataParser());
}

static MetadataRegistry getInstance() {
Expand All @@ -55,6 +61,54 @@
supportedParsers.remove(parser.getTypeUrl());
}

/**
* Parses cluster metadata into a structured map.
*
* <p>Values in {@code typed_filter_metadata} take precedence over
* {@code filter_metadata} when keys overlap, following Envoy API behavior. See
* <a href="https://github.com/envoyproxy/envoy/blob/main/api/envoy/config/core/v3/base.proto#L217-L259">
* Envoy metadata documentation </a> for details.
*
* @param metadata the {@link Metadata} containing the fields to parse.
* @return an immutable map of parsed metadata.
* @throws ResourceInvalidException if parsing {@code typed_filter_metadata} fails.
*/
public ImmutableMap<String, Object> parseMetadata(Metadata metadata)
throws ResourceInvalidException {
ImmutableMap.Builder<String, Object> parsedMetadata = ImmutableMap.builder();

// Process typed_filter_metadata
for (Map.Entry<String, Any> entry : metadata.getTypedFilterMetadataMap().entrySet()) {
String key = entry.getKey();
Any value = entry.getValue();
MetadataValueParser parser = findParser(value.getTypeUrl());
if (parser != null) {
try {
Object parsedValue = parser.parse(value);
parsedMetadata.put(key, parsedValue);
} catch (ResourceInvalidException e) {
throw new ResourceInvalidException(
String.format("Failed to parse metadata key: %s, type: %s. Error: %s",
key, value.getTypeUrl(), e.getMessage()), e);

Check warning on line 92 in xds/src/main/java/io/grpc/xds/MetadataRegistry.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/MetadataRegistry.java#L89-L92

Added lines #L89 - L92 were not covered by tests
}
}
}
// building once to reuse in the next loop
ImmutableMap<String, Object> intermediateParsedMetadata = parsedMetadata.build();

// Process filter_metadata for remaining keys
for (Map.Entry<String, Struct> entry : metadata.getFilterMetadataMap().entrySet()) {
String key = entry.getKey();
if (!intermediateParsedMetadata.containsKey(key)) {
Struct structValue = entry.getValue();
Object jsonValue = ProtobufJsonConverter.convertToJson(structValue);
parsedMetadata.put(key, jsonValue);
}
}

return parsedMetadata.build();
}

interface MetadataValueParser {

String getTypeUrl();
Expand All @@ -64,8 +118,8 @@
*
* @param any the {@link Any} object to parse.
* @return the parsed metadata value.
* @throws InvalidProtocolBufferException if the parsing fails.
* @throws ResourceInvalidException if the parsing fails.
*/
Object parse(Any any) throws InvalidProtocolBufferException;
Object parse(Any any) throws ResourceInvalidException;
}
}
Loading