|
25 | 25 | import com.google.common.base.Strings; |
26 | 26 | import com.google.common.collect.ImmutableList; |
27 | 27 | import com.google.common.collect.ImmutableMap; |
| 28 | +import com.google.protobuf.Any; |
28 | 29 | import com.google.protobuf.Duration; |
29 | 30 | import com.google.protobuf.InvalidProtocolBufferException; |
30 | 31 | import com.google.protobuf.Message; |
31 | 32 | import com.google.protobuf.Struct; |
32 | 33 | import com.google.protobuf.util.Durations; |
33 | 34 | import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers.Thresholds; |
34 | 35 | import io.envoyproxy.envoy.config.cluster.v3.Cluster; |
| 36 | +import io.envoyproxy.envoy.config.core.v3.Metadata; |
35 | 37 | import io.envoyproxy.envoy.config.core.v3.RoutingPriority; |
36 | 38 | import io.envoyproxy.envoy.config.core.v3.SocketAddress; |
37 | 39 | import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; |
|
44 | 46 | import io.grpc.internal.ServiceConfigUtil.LbConfig; |
45 | 47 | import io.grpc.xds.EnvoyServerProtoData.OutlierDetection; |
46 | 48 | import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; |
| 49 | +import io.grpc.xds.MetadataRegistry.MetadataValueParser; |
47 | 50 | import io.grpc.xds.XdsClusterResource.CdsUpdate; |
48 | 51 | import io.grpc.xds.client.XdsClient.ResourceUpdate; |
49 | 52 | import io.grpc.xds.client.XdsResourceType; |
| 53 | +import io.grpc.xds.internal.ProtobufJsonConverter; |
50 | 54 | import io.grpc.xds.internal.security.CommonTlsContextUtil; |
51 | 55 | import java.util.List; |
52 | 56 | import java.util.Locale; |
| 57 | +import java.util.Map; |
53 | 58 | import java.util.Set; |
54 | 59 | import javax.annotation.Nullable; |
55 | 60 |
|
@@ -171,9 +176,62 @@ static CdsUpdate processCluster(Cluster cluster, |
171 | 176 | updateBuilder.filterMetadata( |
172 | 177 | ImmutableMap.copyOf(cluster.getMetadata().getFilterMetadataMap())); |
173 | 178 |
|
| 179 | + try { |
| 180 | + ImmutableMap<String, Object> parsedFilterMetadata = |
| 181 | + parseClusterMetadata(cluster.getMetadata()); |
| 182 | + updateBuilder.parsedMetadata(parsedFilterMetadata); |
| 183 | + } catch (InvalidProtocolBufferException e) { |
| 184 | + throw new ResourceInvalidException( |
| 185 | + "Failed to parse xDS filter metadata for cluster '" + cluster.getName() + "': " |
| 186 | + + e.getMessage(), e); |
| 187 | + } |
| 188 | + |
174 | 189 | return updateBuilder.build(); |
175 | 190 | } |
176 | 191 |
|
| 192 | + /** |
| 193 | + * Parses cluster metadata into a structured map. |
| 194 | + * |
| 195 | + * <p>Values in {@code typed_filter_metadata} take precedence over |
| 196 | + * {@code filter_metadata} when keys overlap, following Envoy API behavior. See |
| 197 | + * <a href="https://github.com/envoyproxy/envoy/blob/main/api/envoy/config/core/v3/base.proto#L217-L259"> |
| 198 | + * Envoy metadata documentation </a> for details. |
| 199 | + * |
| 200 | + * @param metadata the {@link Metadata} containing the fields to parse. |
| 201 | + * @return an immutable map of parsed metadata. |
| 202 | + * @throws InvalidProtocolBufferException if parsing {@code typed_filter_metadata} fails. |
| 203 | + */ |
| 204 | + private static ImmutableMap<String, Object> parseClusterMetadata(Metadata metadata) |
| 205 | + throws InvalidProtocolBufferException { |
| 206 | + ImmutableMap.Builder<String, Object> parsedMetadata = ImmutableMap.builder(); |
| 207 | + |
| 208 | + MetadataRegistry registry = MetadataRegistry.getInstance(); |
| 209 | + // Process typed_filter_metadata |
| 210 | + for (Map.Entry<String, Any> entry : metadata.getTypedFilterMetadataMap().entrySet()) { |
| 211 | + String key = entry.getKey(); |
| 212 | + Any value = entry.getValue(); |
| 213 | + MetadataValueParser parser = registry.findParser(value.getTypeUrl()); |
| 214 | + if (parser != null) { |
| 215 | + Object parsedValue = parser.parse(value); |
| 216 | + parsedMetadata.put(key, parsedValue); |
| 217 | + } |
| 218 | + } |
| 219 | + // building once to reuse in the next loop |
| 220 | + ImmutableMap<String, Object> intermediateParsedMetadata = parsedMetadata.build(); |
| 221 | + |
| 222 | + // Process filter_metadata for remaining keys |
| 223 | + for (Map.Entry<String, Struct> entry : metadata.getFilterMetadataMap().entrySet()) { |
| 224 | + String key = entry.getKey(); |
| 225 | + if (!intermediateParsedMetadata.containsKey(key)) { |
| 226 | + Struct structValue = entry.getValue(); |
| 227 | + Object jsonValue = ProtobufJsonConverter.convertToJson(structValue); |
| 228 | + parsedMetadata.put(key, jsonValue); |
| 229 | + } |
| 230 | + } |
| 231 | + |
| 232 | + return parsedMetadata.build(); |
| 233 | + } |
| 234 | + |
177 | 235 | private static StructOrError<CdsUpdate.Builder> parseAggregateCluster(Cluster cluster) { |
178 | 236 | String clusterName = cluster.getName(); |
179 | 237 | Cluster.CustomClusterType customType = cluster.getClusterType(); |
@@ -573,13 +631,16 @@ abstract static class CdsUpdate implements ResourceUpdate { |
573 | 631 |
|
574 | 632 | abstract ImmutableMap<String, Struct> filterMetadata(); |
575 | 633 |
|
| 634 | + abstract ImmutableMap<String, Object> parsedMetadata(); |
| 635 | + |
576 | 636 | private static Builder newBuilder(String clusterName) { |
577 | 637 | return new AutoValue_XdsClusterResource_CdsUpdate.Builder() |
578 | 638 | .clusterName(clusterName) |
579 | 639 | .minRingSize(0) |
580 | 640 | .maxRingSize(0) |
581 | 641 | .choiceCount(0) |
582 | | - .filterMetadata(ImmutableMap.of()); |
| 642 | + .filterMetadata(ImmutableMap.of()) |
| 643 | + .parsedMetadata(ImmutableMap.of()); |
583 | 644 | } |
584 | 645 |
|
585 | 646 | static Builder forAggregate(String clusterName, List<String> prioritizedClusterNames) { |
@@ -698,6 +759,8 @@ Builder leastRequestLbPolicy(Integer choiceCount) { |
698 | 759 |
|
699 | 760 | protected abstract Builder filterMetadata(ImmutableMap<String, Struct> filterMetadata); |
700 | 761 |
|
| 762 | + protected abstract Builder parsedMetadata(ImmutableMap<String, Object> parsedMetadata); |
| 763 | + |
701 | 764 | abstract CdsUpdate build(); |
702 | 765 | } |
703 | 766 | } |
|
0 commit comments