diff --git a/xds/src/main/java/io/grpc/xds/BackendMetricPropagation.java b/xds/src/main/java/io/grpc/xds/BackendMetricPropagation.java new file mode 100644 index 00000000000..bbd8e8e70fb --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/BackendMetricPropagation.java @@ -0,0 +1,110 @@ +/* + * Copyright 2025 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.ImmutableSet; +import io.grpc.Internal; +import javax.annotation.Nullable; + +/** + * Represents the configuration for which ORCA metrics should be propagated from backend + * to LRS load reports, as defined in gRFC A85. + */ +@Internal +public final class BackendMetricPropagation { + + public final boolean propagateCpuUtilization; + public final boolean propagateMemUtilization; + public final boolean propagateApplicationUtilization; + + private final boolean propagateAllNamedMetrics; + private final ImmutableSet namedMetricKeys; + + private BackendMetricPropagation( + boolean propagateCpuUtilization, + boolean propagateMemUtilization, + boolean propagateApplicationUtilization, + boolean propagateAllNamedMetrics, + ImmutableSet namedMetricKeys) { + this.propagateCpuUtilization = propagateCpuUtilization; + this.propagateMemUtilization = propagateMemUtilization; + this.propagateApplicationUtilization = propagateApplicationUtilization; + this.propagateAllNamedMetrics = propagateAllNamedMetrics; + this.namedMetricKeys = checkNotNull(namedMetricKeys, "namedMetricKeys"); + } + + /** + * Creates a BackendMetricPropagation from a list of metric specifications. + * + * @param metricSpecs list of metric specification strings from CDS resource + * @return BackendMetricPropagation instance + */ + public static BackendMetricPropagation fromMetricSpecs(@Nullable java.util.List metricSpecs) { + if (metricSpecs == null || metricSpecs.isEmpty()) { + return new BackendMetricPropagation(false, false, false, false, ImmutableSet.of()); + } + + boolean propagateCpuUtilization = false; + boolean propagateMemUtilization = false; + boolean propagateApplicationUtilization = false; + boolean propagateAllNamedMetrics = false; + ImmutableSet.Builder namedMetricKeysBuilder = ImmutableSet.builder(); + for (String spec : metricSpecs) { + if (spec == null) { + continue; + } + switch (spec) { + case "cpu_utilization": + propagateCpuUtilization = true; + break; + case "mem_utilization": + propagateMemUtilization = true; + break; + case "application_utilization": + propagateApplicationUtilization = true; + break; + case "named_metrics.*": + propagateAllNamedMetrics = true; + break; + default: + if (spec.startsWith("named_metrics.")) { + String metricKey = spec.substring("named_metrics.".length()); + if (!metricKey.isEmpty()) { + namedMetricKeysBuilder.add(metricKey); + } + } + break; + } + } + + return new BackendMetricPropagation( + propagateCpuUtilization, + propagateMemUtilization, + propagateApplicationUtilization, + propagateAllNamedMetrics, + namedMetricKeysBuilder.build()); + } + + /** + * Returns whether the given named metric key should be propagated. + */ + public boolean shouldPropagateNamedMetric(String metricKey) { + return propagateAllNamedMetrics || namedMetricKeys.contains(metricKey); + } +} \ No newline at end of file diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java index c50f844d388..ffadaa8ae7b 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java @@ -141,7 +141,8 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { result.maxConcurrentRequests(), result.upstreamTlsContext(), result.filterMetadata(), - result.outlierDetection()); + result.outlierDetection(), + result.backendMetricPropagation()); } else { instance = DiscoveryMechanism.forLogicalDns( leafName, @@ -149,7 +150,8 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { result.lrsServerInfo(), result.maxConcurrentRequests(), result.upstreamTlsContext(), - result.filterMetadata()); + result.filterMetadata(), + result.backendMetricPropagation()); } instances.add(instance); } diff --git a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java index 034cdee0815..b1b27ebd9af 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java @@ -148,6 +148,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { childLbHelper.updateMaxConcurrentRequests(config.maxConcurrentRequests); childLbHelper.updateSslContextProviderSupplier(config.tlsContext); childLbHelper.updateFilterMetadata(config.filterMetadata); + childLbHelper.updateBackendMetricPropagation(config.backendMetricPropagation); childSwitchLb.handleResolvedAddresses( resolvedAddresses.toBuilder() @@ -208,6 +209,8 @@ private final class ClusterImplLbHelper extends ForwardingLoadBalancerHelper { private Map filterMetadata = ImmutableMap.of(); @Nullable private final ServerInfo lrsServerInfo; + @Nullable + private BackendMetricPropagation backendMetricPropagation; private ClusterImplLbHelper(AtomicLong inFlights, @Nullable ServerInfo lrsServerInfo) { this.inFlights = checkNotNull(inFlights, "inFlights"); @@ -320,7 +323,7 @@ private ClusterLocality createClusterLocalityFromAttributes(Attributes addressAt (lrsServerInfo == null) ? null : xdsClient.addClusterLocalityStats(lrsServerInfo, cluster, - edsServiceName, locality); + edsServiceName, locality, backendMetricPropagation); return new ClusterLocality(localityStats, localityName); } @@ -370,6 +373,11 @@ private void updateFilterMetadata(Map filterMetadata) { this.filterMetadata = ImmutableMap.copyOf(filterMetadata); } + private void updateBackendMetricPropagation( + @Nullable BackendMetricPropagation backendMetricPropagation) { + this.backendMetricPropagation = backendMetricPropagation; + } + private class RequestLimitingSubchannelPicker extends SubchannelPicker { private final SubchannelPicker delegate; private final List dropPolicies; @@ -505,11 +513,18 @@ private OrcaPerRpcListener(ClusterLocalityStats stats) { } /** - * Copies {@link MetricReport#getNamedMetrics()} to {@link ClusterLocalityStats} such that it is - * included in the snapshot for the LRS report sent to the LRS server. + * Copies ORCA metrics from {@link MetricReport} to {@link ClusterLocalityStats} + * such that they are included in the snapshot for the LRS report sent to the LRS server. + * This includes both top-level metrics (CPU, memory, application utilization) and named + * metrics, filtered according to the backend metric propagation configuration. */ @Override public void onLoadReport(MetricReport report) { + stats.recordTopLevelMetrics( + report.getCpuUtilization(), + report.getMemoryUtilization(), + report.getApplicationUtilization()); + stats.recordBackendLoadMetricStats(report.getNamedMetrics()); } } diff --git a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancerProvider.java b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancerProvider.java index 4c9c14ba5f5..02f730ba58f 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancerProvider.java +++ b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancerProvider.java @@ -98,11 +98,14 @@ static final class ClusterImplConfig { // Provides the direct child policy and its config. final Object childConfig; final Map filterMetadata; + @Nullable + final BackendMetricPropagation backendMetricPropagation; ClusterImplConfig(String cluster, @Nullable String edsServiceName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, List dropCategories, Object childConfig, - @Nullable UpstreamTlsContext tlsContext, Map filterMetadata) { + @Nullable UpstreamTlsContext tlsContext, Map filterMetadata, + @Nullable BackendMetricPropagation backendMetricPropagation) { this.cluster = checkNotNull(cluster, "cluster"); this.edsServiceName = edsServiceName; this.lrsServerInfo = lrsServerInfo; @@ -112,6 +115,7 @@ static final class ClusterImplConfig { this.dropCategories = Collections.unmodifiableList( new ArrayList<>(checkNotNull(dropCategories, "dropCategories"))); this.childConfig = checkNotNull(childConfig, "childConfig"); + this.backendMetricPropagation = backendMetricPropagation; } @Override diff --git a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java index 080760303bf..8e1108f1b6c 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java @@ -191,11 +191,11 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { if (instance.type == DiscoveryMechanism.Type.EDS) { state = new EdsClusterState(instance.cluster, instance.edsServiceName, instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext, - instance.filterMetadata, instance.outlierDetection); + instance.filterMetadata, instance.outlierDetection, instance.backendMetricPropagation); } else { // logical DNS state = new LogicalDnsClusterState(instance.cluster, instance.dnsHostName, instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext, - instance.filterMetadata); + instance.filterMetadata, instance.backendMetricPropagation); } clusterStates.put(instance.cluster, state); state.start(); @@ -334,6 +334,8 @@ private abstract class ClusterState { protected final Map filterMetadata; @Nullable protected final OutlierDetection outlierDetection; + @Nullable + protected final BackendMetricPropagation backendMetricPropagation; // Resolution status, may contain most recent error encountered. protected Status status = Status.OK; // True if has received resolution result. @@ -346,13 +348,15 @@ private abstract class ClusterState { private ClusterState(String name, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, - Map filterMetadata, @Nullable OutlierDetection outlierDetection) { + Map filterMetadata, @Nullable OutlierDetection outlierDetection, + @Nullable BackendMetricPropagation backendMetricPropagation) { this.name = name; this.lrsServerInfo = lrsServerInfo; this.maxConcurrentRequests = maxConcurrentRequests; this.tlsContext = tlsContext; this.filterMetadata = ImmutableMap.copyOf(filterMetadata); this.outlierDetection = outlierDetection; + this.backendMetricPropagation = backendMetricPropagation; } abstract void start(); @@ -371,9 +375,10 @@ private final class EdsClusterState extends ClusterState implements ResourceWatc private EdsClusterState(String name, @Nullable String edsServiceName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, Map filterMetadata, - @Nullable OutlierDetection outlierDetection) { + @Nullable OutlierDetection outlierDetection, + @Nullable BackendMetricPropagation backendMetricPropagation) { super(name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, - outlierDetection); + outlierDetection, backendMetricPropagation); this.edsServiceName = edsServiceName; } @@ -470,8 +475,8 @@ public void run() { Map priorityChildConfigs = generateEdsBasedPriorityChildConfigs( name, edsServiceName, lrsServerInfo, maxConcurrentRequests, tlsContext, - filterMetadata, outlierDetection, endpointLbConfig, lbRegistry, - prioritizedLocalityWeights, dropOverloads); + filterMetadata, backendMetricPropagation, outlierDetection, + endpointLbConfig, lbRegistry, prioritizedLocalityWeights, dropOverloads); status = Status.OK; resolved = true; result = new ClusterResolutionResult(addresses, priorityChildConfigs, @@ -585,8 +590,10 @@ private final class LogicalDnsClusterState extends ClusterState { private LogicalDnsClusterState(String name, String dnsHostName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, - @Nullable UpstreamTlsContext tlsContext, Map filterMetadata) { - super(name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, null); + @Nullable UpstreamTlsContext tlsContext, Map filterMetadata, + @Nullable BackendMetricPropagation backendMetricPropagation) { + super(name, lrsServerInfo, maxConcurrentRequests, tlsContext, + filterMetadata, null, backendMetricPropagation); this.dnsHostName = checkNotNull(dnsHostName, "dnsHostName"); nameResolverFactory = checkNotNull(helper.getNameResolverRegistry().asFactory(), "nameResolverFactory"); @@ -688,7 +695,7 @@ public Status onResult2(final ResolutionResult resolutionResult) { } PriorityChildConfig priorityChildConfig = generateDnsBasedPriorityChildConfig( name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, - lbRegistry, Collections.emptyList()); + backendMetricPropagation, lbRegistry, Collections.emptyList()); status = Status.OK; resolved = true; result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig); @@ -772,13 +779,14 @@ private static class ClusterResolutionResult { private static PriorityChildConfig generateDnsBasedPriorityChildConfig( String cluster, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, Map filterMetadata, + @Nullable BackendMetricPropagation backendMetricPropagation, LoadBalancerRegistry lbRegistry, List dropOverloads) { // Override endpoint-level LB policy with pick_first for logical DNS cluster. Object endpointLbConfig = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( lbRegistry.getProvider("pick_first"), null); ClusterImplConfig clusterImplConfig = new ClusterImplConfig(cluster, null, lrsServerInfo, maxConcurrentRequests, - dropOverloads, endpointLbConfig, tlsContext, filterMetadata); + dropOverloads, endpointLbConfig, tlsContext, filterMetadata, backendMetricPropagation); LoadBalancerProvider clusterImplLbProvider = lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME); Object clusterImplPolicy = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( @@ -795,7 +803,7 @@ private static PriorityChildConfig generateDnsBasedPriorityChildConfig( private static Map generateEdsBasedPriorityChildConfigs( String cluster, @Nullable String edsServiceName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, - Map filterMetadata, + Map filterMetadata, @Nullable BackendMetricPropagation backendMetricPropagation, @Nullable OutlierDetection outlierDetection, Object endpointLbConfig, LoadBalancerRegistry lbRegistry, Map> prioritizedLocalityWeights, List dropOverloads) { @@ -803,7 +811,7 @@ private static Map generateEdsBasedPriorityChildCon for (String priority : prioritizedLocalityWeights.keySet()) { ClusterImplConfig clusterImplConfig = new ClusterImplConfig(cluster, edsServiceName, lrsServerInfo, maxConcurrentRequests, - dropOverloads, endpointLbConfig, tlsContext, filterMetadata); + dropOverloads, endpointLbConfig, tlsContext, filterMetadata, backendMetricPropagation); LoadBalancerProvider clusterImplLbProvider = lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME); Object priorityChildPolicy = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( diff --git a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java index b5dcb271368..68fab7042e0 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java +++ b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java @@ -137,6 +137,8 @@ static final class DiscoveryMechanism { @Nullable final OutlierDetection outlierDetection; final Map filterMetadata; + @Nullable + final BackendMetricPropagation backendMetricPropagation; enum Type { EDS, @@ -146,7 +148,8 @@ enum Type { private DiscoveryMechanism(String cluster, Type type, @Nullable String edsServiceName, @Nullable String dnsHostName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, - Map filterMetadata, @Nullable OutlierDetection outlierDetection) { + Map filterMetadata, @Nullable OutlierDetection outlierDetection, + @Nullable BackendMetricPropagation backendMetricPropagation) { this.cluster = checkNotNull(cluster, "cluster"); this.type = checkNotNull(type, "type"); this.edsServiceName = edsServiceName; @@ -156,27 +159,30 @@ private DiscoveryMechanism(String cluster, Type type, @Nullable String edsServic this.tlsContext = tlsContext; this.filterMetadata = ImmutableMap.copyOf(checkNotNull(filterMetadata, "filterMetadata")); this.outlierDetection = outlierDetection; + this.backendMetricPropagation = backendMetricPropagation; } static DiscoveryMechanism forEds(String cluster, @Nullable String edsServiceName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, Map filterMetadata, - OutlierDetection outlierDetection) { + OutlierDetection outlierDetection, @Nullable BackendMetricPropagation backendMetricPropagation) { return new DiscoveryMechanism(cluster, Type.EDS, edsServiceName, null, lrsServerInfo, - maxConcurrentRequests, tlsContext, filterMetadata, outlierDetection); + maxConcurrentRequests, tlsContext, filterMetadata, outlierDetection, backendMetricPropagation); } static DiscoveryMechanism forLogicalDns(String cluster, String dnsHostName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, - @Nullable UpstreamTlsContext tlsContext, Map filterMetadata) { + @Nullable UpstreamTlsContext tlsContext, Map filterMetadata, + @Nullable BackendMetricPropagation backendMetricPropagation) { return new DiscoveryMechanism(cluster, Type.LOGICAL_DNS, null, dnsHostName, - lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, null); + lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, null, + backendMetricPropagation); } @Override public int hashCode() { return Objects.hash(cluster, type, lrsServerInfo, maxConcurrentRequests, tlsContext, - edsServiceName, dnsHostName, filterMetadata, outlierDetection); + edsServiceName, dnsHostName, filterMetadata, outlierDetection, backendMetricPropagation); } @Override diff --git a/xds/src/main/java/io/grpc/xds/XdsClusterResource.java b/xds/src/main/java/io/grpc/xds/XdsClusterResource.java index a5220515b6c..b6938c27dec 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClusterResource.java +++ b/xds/src/main/java/io/grpc/xds/XdsClusterResource.java @@ -50,6 +50,7 @@ import io.grpc.xds.client.XdsClient.ResourceUpdate; import io.grpc.xds.client.XdsResourceType; import io.grpc.xds.internal.security.CommonTlsContextUtil; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Set; @@ -67,6 +68,9 @@ class XdsClusterResource extends XdsResourceType { GrpcUtil.getFlag("GRPC_EXPERIMENTAL_XDS_SYSTEM_ROOT_CERTS", false); static boolean isEnabledXdsHttpConnect = GrpcUtil.getFlag("GRPC_EXPERIMENTAL_XDS_HTTP_CONNECT", false); + @VisibleForTesting + static boolean isEnabledOrcaLrsPropagation = + GrpcUtil.getFlag("GRPC_EXPERIMENTAL_XDS_ORCA_LRS_PROPAGATION", false); @VisibleForTesting static final String AGGREGATE_CLUSTER_TYPE_NAME = "envoy.clusters.aggregate"; @@ -227,6 +231,12 @@ private static StructOrError parseNonAggregateCluster( UpstreamTlsContext upstreamTlsContext = null; OutlierDetection outlierDetection = null; boolean isHttp11ProxyAvailable = false; + BackendMetricPropagation backendMetricPropagation = null; + + if (isEnabledOrcaLrsPropagation) { + backendMetricPropagation = BackendMetricPropagation.fromMetricSpecs( + cluster.getLrsReportEndpointMetricsList()); + } if (cluster.hasLrsServer()) { if (!cluster.getLrsServer().hasSelf()) { return StructOrError.fromError( @@ -326,7 +336,7 @@ private static StructOrError parseNonAggregateCluster( return StructOrError.fromStruct(CdsUpdate.forEds( clusterName, edsServiceName, lrsServerInfo, maxConcurrentRequests, upstreamTlsContext, - outlierDetection, isHttp11ProxyAvailable)); + outlierDetection, isHttp11ProxyAvailable, backendMetricPropagation)); } else if (type.equals(Cluster.DiscoveryType.LOGICAL_DNS)) { if (!cluster.hasLoadAssignment()) { return StructOrError.fromError( @@ -362,7 +372,7 @@ private static StructOrError parseNonAggregateCluster( Locale.US, "%s:%d", socketAddress.getAddress(), socketAddress.getPortValue()); return StructOrError.fromStruct(CdsUpdate.forLogicalDns( clusterName, dnsHostName, lrsServerInfo, maxConcurrentRequests, - upstreamTlsContext, isHttp11ProxyAvailable)); + upstreamTlsContext, isHttp11ProxyAvailable, backendMetricPropagation)); } return StructOrError.fromError( "Cluster " + clusterName + ": unsupported built-in discovery type: " + type); @@ -614,6 +624,9 @@ abstract static class CdsUpdate implements ResourceUpdate { abstract ImmutableMap parsedMetadata(); + @Nullable + abstract BackendMetricPropagation backendMetricPropagation(); + private static Builder newBuilder(String clusterName) { return new AutoValue_XdsClusterResource_CdsUpdate.Builder() .clusterName(clusterName) @@ -622,7 +635,8 @@ private static Builder newBuilder(String clusterName) { .choiceCount(0) .filterMetadata(ImmutableMap.of()) .parsedMetadata(ImmutableMap.of()) - .isHttp11ProxyAvailable(false); + .isHttp11ProxyAvailable(false) + .backendMetricPropagation(null); } static Builder forAggregate(String clusterName, List prioritizedClusterNames) { @@ -636,7 +650,8 @@ static Builder forEds(String clusterName, @Nullable String edsServiceName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext upstreamTlsContext, @Nullable OutlierDetection outlierDetection, - boolean isHttp11ProxyAvailable) { + boolean isHttp11ProxyAvailable, + BackendMetricPropagation backendMetricPropagation) { return newBuilder(clusterName) .clusterType(ClusterType.EDS) .edsServiceName(edsServiceName) @@ -644,21 +659,24 @@ static Builder forEds(String clusterName, @Nullable String edsServiceName, .maxConcurrentRequests(maxConcurrentRequests) .upstreamTlsContext(upstreamTlsContext) .outlierDetection(outlierDetection) - .isHttp11ProxyAvailable(isHttp11ProxyAvailable); + .isHttp11ProxyAvailable(isHttp11ProxyAvailable) + .backendMetricPropagation(backendMetricPropagation); } static Builder forLogicalDns(String clusterName, String dnsHostName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext upstreamTlsContext, - boolean isHttp11ProxyAvailable) { + boolean isHttp11ProxyAvailable, + BackendMetricPropagation backendMetricPropagation) { return newBuilder(clusterName) .clusterType(ClusterType.LOGICAL_DNS) .dnsHostName(dnsHostName) .lrsServerInfo(lrsServerInfo) .maxConcurrentRequests(maxConcurrentRequests) .upstreamTlsContext(upstreamTlsContext) - .isHttp11ProxyAvailable(isHttp11ProxyAvailable); + .isHttp11ProxyAvailable(isHttp11ProxyAvailable) + .backendMetricPropagation(backendMetricPropagation); } enum ClusterType { @@ -749,6 +767,9 @@ Builder leastRequestLbPolicy(Integer choiceCount) { protected abstract Builder parsedMetadata(ImmutableMap parsedMetadata); + protected abstract Builder backendMetricPropagation( + BackendMetricPropagation backendMetricPropagation); + abstract CdsUpdate build(); } } diff --git a/xds/src/main/java/io/grpc/xds/client/LoadStatsManager2.java b/xds/src/main/java/io/grpc/xds/client/LoadStatsManager2.java index be9d3587d14..76e7c48ee91 100644 --- a/xds/src/main/java/io/grpc/xds/client/LoadStatsManager2.java +++ b/xds/src/main/java/io/grpc/xds/client/LoadStatsManager2.java @@ -25,6 +25,7 @@ import com.google.common.collect.Sets; import io.grpc.Internal; import io.grpc.Status; +import io.grpc.xds.BackendMetricPropagation; import io.grpc.xds.client.Stats.BackendLoadMetricStats; import io.grpc.xds.client.Stats.ClusterStats; import io.grpc.xds.client.Stats.DroppedRequests; @@ -98,13 +99,20 @@ private synchronized void releaseClusterDropCounter( /** * Gets or creates the stats object for recording loads for the specified locality (in the - * specified cluster with edsServiceName). The returned object is reference counted and the - * caller should use {@link ClusterLocalityStats#release} to release its hard reference + * specified cluster with edsServiceName) with the specified backend metric propagation + * configuration. The returned object is reference counted and the caller should + * use {@link ClusterLocalityStats#release} to release its hard reference * when it is safe to discard the future stats for the locality. */ @VisibleForTesting public synchronized ClusterLocalityStats getClusterLocalityStats( String cluster, @Nullable String edsServiceName, Locality locality) { + return getClusterLocalityStats(cluster, edsServiceName, locality, null); + } + + public synchronized ClusterLocalityStats getClusterLocalityStats( + String cluster, @Nullable String edsServiceName, Locality locality, + @Nullable BackendMetricPropagation backendMetricPropagation) { if (!allLoadStats.containsKey(cluster)) { allLoadStats.put( cluster, @@ -122,7 +130,7 @@ public synchronized ClusterLocalityStats getClusterLocalityStats( localityStats.put( locality, ReferenceCounted.wrap(new ClusterLocalityStats( - cluster, edsServiceName, locality, stopwatchSupplier.get()))); + cluster, edsServiceName, locality, stopwatchSupplier.get(), backendMetricPropagation))); } ReferenceCounted ref = localityStats.get(locality); ref.retain(); @@ -325,6 +333,8 @@ public final class ClusterLocalityStats { private final String edsServiceName; private final Locality locality; private final Stopwatch stopwatch; + @Nullable + private final BackendMetricPropagation backendMetricPropagation; private final AtomicLong callsInProgress = new AtomicLong(); private final AtomicLong callsSucceeded = new AtomicLong(); private final AtomicLong callsFailed = new AtomicLong(); @@ -333,11 +343,12 @@ public final class ClusterLocalityStats { private ClusterLocalityStats( String clusterName, @Nullable String edsServiceName, Locality locality, - Stopwatch stopwatch) { + Stopwatch stopwatch, BackendMetricPropagation backendMetricPropagation) { this.clusterName = checkNotNull(clusterName, "clusterName"); this.edsServiceName = edsServiceName; this.locality = checkNotNull(locality, "locality"); this.stopwatch = checkNotNull(stopwatch, "stopwatch"); + this.backendMetricPropagation = backendMetricPropagation; stopwatch.reset().start(); } @@ -367,17 +378,87 @@ public void recordCallFinished(Status status) { * requests counter of 1 and the {@code value} if the key is not present in the map. Otherwise, * increments the finished requests counter and adds the {@code value} to the existing * {@link BackendLoadMetricStats}. + * Metrics are filtered based on the backend metric propagation configuration if configured. */ public synchronized void recordBackendLoadMetricStats(Map namedMetrics) { + // If no propagation configuration is set, use the old behavior (propagate everything) + // Otherwise, filter based on the configuration namedMetrics.forEach((name, value) -> { - if (!loadMetricStatsMap.containsKey(name)) { - loadMetricStatsMap.put(name, new BackendLoadMetricStats(1, value)); - } else { - loadMetricStatsMap.get(name).addMetricValueAndIncrementRequestsFinished(value); + if (backendMetricPropagation == null + || backendMetricPropagation.shouldPropagateNamedMetric(name)) { + String prefixedName = (backendMetricPropagation == null) ? name : "named_metrics." + name; + if (!loadMetricStatsMap.containsKey(prefixedName)) { + loadMetricStatsMap.put(prefixedName, new BackendLoadMetricStats(1, value)); + } else { + loadMetricStatsMap.get(prefixedName).addMetricValueAndIncrementRequestsFinished(value); + } } }); } + /** + * Records top-level ORCA metrics (CPU, memory, application utilization) for per-call load + * reporting. Metrics are filtered based on the backend metric propagation configuration + * if configured. + * + * @param cpuUtilization CPU utilization metric value + * @param memUtilization Memory utilization metric value + * @param applicationUtilization Application utilization metric value + */ + public synchronized void recordTopLevelMetrics(double cpuUtilization, double memUtilization, + double applicationUtilization) { + // If no propagation configuration is set, use the old behavior (propagate everything) + // Otherwise, filter based on the configuration + + if (cpuUtilization > 0) { + boolean shouldPropagate = true; + if (backendMetricPropagation != null) { + shouldPropagate = backendMetricPropagation.propagateCpuUtilization; + } + + if (shouldPropagate) { + String metricName = "cpu_utilization"; + if (!loadMetricStatsMap.containsKey(metricName)) { + loadMetricStatsMap.put(metricName, new BackendLoadMetricStats(1, cpuUtilization)); + } else { + loadMetricStatsMap.get(metricName).addMetricValueAndIncrementRequestsFinished(cpuUtilization); + } + } + } + + if (memUtilization > 0) { + boolean shouldPropagate = true; + if (backendMetricPropagation != null) { + shouldPropagate = backendMetricPropagation.propagateMemUtilization; + } + + if (shouldPropagate) { + String metricName = "mem_utilization"; + if (!loadMetricStatsMap.containsKey(metricName)) { + loadMetricStatsMap.put(metricName, new BackendLoadMetricStats(1, memUtilization)); + } else { + loadMetricStatsMap.get(metricName).addMetricValueAndIncrementRequestsFinished(memUtilization); + } + } + } + + if (applicationUtilization > 0) { + boolean shouldPropagate = true; + if (backendMetricPropagation != null) { + shouldPropagate = backendMetricPropagation.propagateApplicationUtilization; + } + + if (shouldPropagate) { + String metricName = "application_utilization"; + if (!loadMetricStatsMap.containsKey(metricName)) { + loadMetricStatsMap.put(metricName, new BackendLoadMetricStats(1, applicationUtilization)); + } else { + loadMetricStatsMap.get(metricName).addMetricValueAndIncrementRequestsFinished(applicationUtilization); + } + } + } + } + /** * Release the hard reference for this stats object (previously obtained via {@link * LoadStatsManager2#getClusterLocalityStats}). The object may still be diff --git a/xds/src/main/java/io/grpc/xds/client/XdsClient.java b/xds/src/main/java/io/grpc/xds/client/XdsClient.java index edbb0b2d74c..ac732e8a6e2 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsClient.java @@ -27,6 +27,7 @@ import com.google.protobuf.Any; import io.grpc.ExperimentalApi; import io.grpc.Status; +import io.grpc.xds.BackendMetricPropagation; import io.grpc.xds.client.Bootstrapper.ServerInfo; import java.net.URI; import java.net.URISyntaxException; @@ -387,6 +388,23 @@ public LoadStatsManager2.ClusterLocalityStats addClusterLocalityStats( throw new UnsupportedOperationException(); } + /** + * Adds load stats for the specified locality (in the specified cluster with edsServiceName) by + * using the returned object to record RPCs. Load stats recorded with the returned object will + * be reported to the load reporting server. The returned object is reference counted and the + * caller should use {@link LoadStatsManager2.ClusterLocalityStats#release} to release its + * hard reference when it is safe to stop reporting RPC loads for the specified locality + * in the future. + * + * @param backendMetricPropagation Configuration for which backend metrics should be propagated + * to LRS load reports. If null, all metrics will be propagated (legacy behavior). + */ + public LoadStatsManager2.ClusterLocalityStats addClusterLocalityStats( + Bootstrapper.ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName, + Locality locality, @Nullable BackendMetricPropagation backendMetricPropagation) { + throw new UnsupportedOperationException(); + } + /** * Returns a map of control plane server info objects to the LoadReportClients that are * responsible for sending load reports to the control plane servers. diff --git a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java index 2b25d4db977..2c004c5ab83 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java @@ -37,6 +37,7 @@ import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.internal.BackoffPolicy; import io.grpc.internal.TimeProvider; +import io.grpc.xds.BackendMetricPropagation; import io.grpc.xds.client.Bootstrapper.AuthorityInfo; import io.grpc.xds.client.Bootstrapper.ServerInfo; import io.grpc.xds.client.XdsClient.ResourceStore; @@ -420,6 +421,29 @@ public void run() { return loadCounter; } + @Override + public LoadStatsManager2.ClusterLocalityStats addClusterLocalityStats( + final ServerInfo serverInfo, + String clusterName, + @Nullable String edsServiceName, + Locality locality, + @Nullable BackendMetricPropagation backendMetricPropagation) { + LoadStatsManager2 loadStatsManager = loadStatsManagerMap.get(serverInfo); + + LoadStatsManager2.ClusterLocalityStats loadCounter = + loadStatsManager.getClusterLocalityStats( + clusterName, edsServiceName, locality, backendMetricPropagation); + + syncContext.execute(new Runnable() { + @Override + public void run() { + serverLrsClientMap.get(serverInfo).startLoadReporting(); + } + }); + + return loadCounter; + } + @Override public Bootstrapper.BootstrapInfo getBootstrapInfo() { diff --git a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java index 7df0630b779..3411bdd2915 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java @@ -191,7 +191,7 @@ public void handleResolvedAddresses_propagateToChildPolicy() { null, Collections.emptyList(), GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( weightedTargetProvider, weightedTargetConfig), - null, Collections.emptyMap()); + null, Collections.emptyMap(), null); EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality); deliverAddressesAndConfig(Collections.singletonList(endpoint), config); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(downstreamBalancers); @@ -219,7 +219,7 @@ public void handleResolvedAddresses_childPolicyChanges() { null, Collections.emptyList(), GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( weightedTargetProvider, weightedTargetConfig), - null, Collections.emptyMap()); + null, Collections.emptyMap(), null); EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality); deliverAddressesAndConfig(Collections.singletonList(endpoint), configWithWeightedTarget); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(downstreamBalancers); @@ -234,7 +234,7 @@ public void handleResolvedAddresses_childPolicyChanges() { null, Collections.emptyList(), GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( wrrLocalityProvider, wrrLocalityConfig), - null, Collections.emptyMap()); + null, Collections.emptyMap(), null); deliverAddressesAndConfig(Collections.singletonList(endpoint), configWithWrrLocality); childBalancer = Iterables.getOnlyElement(downstreamBalancers); assertThat(childBalancer.name).isEqualTo(XdsLbPolicies.WRR_LOCALITY_POLICY_NAME);