Skip to content

feat: add the missing xds.authority label #12018

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Apr 22, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ final class XdsClientMetricReporterImpl implements XdsClientMetricReporter {
Arrays.asList("grpc.target", "grpc.xds.server"), Collections.emptyList(), false);
RESOURCES_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.xds_client.resources",
"EXPERIMENTAL. Number of xDS resources.", "{resource}",
Arrays.asList("grpc.target", "grpc.xds.cache_state",
Arrays.asList("grpc.target", "grpc.xds.authority", "grpc.xds.cache_state",
"grpc.xds.resource_type"), Collections.emptyList(), false);
}

Expand Down Expand Up @@ -143,7 +143,13 @@ void reportCallbackMetrics(BatchRecorder recorder, XdsClient xdsClient) {
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType =
getResourceMetadataCompleted.get(10, TimeUnit.SECONDS);

computeAndReportResourceCounts(metadataByType, callback);
ListenableFuture<Map<XdsResourceType<?>, Map<String, String>>>
getResourceAuthorityCompleted = xdsClient.getSubscribedResourcesAuthoritySnapshot();

Map<XdsResourceType<?>, Map<String, String>> authorityByType =
getResourceAuthorityCompleted.get(10, TimeUnit.SECONDS);

computeAndReportResourceCounts(metadataByType, authorityByType, callback);

// Normally this shouldn't take long, but adding a timeout to avoid indefinite blocking
Void unused = reportServerConnectionsCompleted.get(5, TimeUnit.SECONDS);
Expand All @@ -157,19 +163,27 @@ void reportCallbackMetrics(BatchRecorder recorder, XdsClient xdsClient) {

private void computeAndReportResourceCounts(
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType,
Map<XdsResourceType<?>, Map<String, String>> authorityByType,
MetricReporterCallback callback) {
for (Map.Entry<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByTypeEntry :
metadataByType.entrySet()) {
XdsResourceType<?> type = metadataByTypeEntry.getKey();

Map<String, Long> resourceCountsByState = new HashMap<>();
for (ResourceMetadata metadata : metadataByTypeEntry.getValue().values()) {
Map<String, String> authorityByState = new HashMap<>();
for (Map.Entry<String, ResourceMetadata> metadataByName :
metadataByTypeEntry.getValue().entrySet()) {
String resourceName = metadataByName.getKey();
ResourceMetadata metadata = metadataByName.getValue();
String cacheState = cacheStateFromResourceStatus(metadata.getStatus(), metadata.isCached());
resourceCountsByState.compute(cacheState, (k, v) -> (v == null) ? 1 : v + 1);
authorityByState.put(cacheState, authorityByType.get(type).get(resourceName));
}

resourceCountsByState.forEach((cacheState, count) ->
callback.reportResourceCountGauge(count, cacheState, type.typeUrl()));
resourceCountsByState.forEach((cacheState, count) -> {
callback.reportResourceCountGauge(authorityByState.get(cacheState),
count, cacheState, type.typeUrl());
});
}
}

Expand Down Expand Up @@ -200,10 +214,11 @@ static final class MetricReporterCallback implements ServerConnectionCallback {
}

// TODO(dnvindhya): include the "authority" label once xds.authority is available.
void reportResourceCountGauge(long resourceCount, String cacheState,
void reportResourceCountGauge(String authority, long resourceCount, String cacheState,
String resourceType) {
recorder.recordLongGauge(RESOURCES_GAUGE, resourceCount,
Arrays.asList(target, cacheState, resourceType), Collections.emptyList());
Arrays.asList(target, authority == null ? "#old" : authority,
cacheState, resourceType), Collections.emptyList());
}

@Override
Expand Down
13 changes: 13 additions & 0 deletions xds/src/main/java/io/grpc/xds/client/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,19 @@
throw new UnsupportedOperationException();
}

/**
* Returns a {@link ListenableFuture} to the snapshot of the subscribed resources as
* they are at the moment of the call.
*
* <p>The snapshot is a map from the "resource type" to
* a map ("resource name": "authority").
*/
// Must be synchronized.
public ListenableFuture<Map<XdsResourceType<?>, Map<String, String>>>
getSubscribedResourcesAuthoritySnapshot() {
throw new UnsupportedOperationException();

Check warning on line 332 in xds/src/main/java/io/grpc/xds/client/XdsClient.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/client/XdsClient.java#L332

Added line #L332 was not covered by tests
}

/**
* Registers a data watcher for the given Xds resource.
*/
Expand Down
27 changes: 27 additions & 0 deletions xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,33 @@
return future;
}

// As XdsClient APIs becomes resource agnostic, subscribed resource types are dynamic.
// ResourceTypes that do not have subscribers does not show up in the snapshot keys.
@Override
public ListenableFuture<Map<XdsResourceType<?>, Map<String, String>>>
getSubscribedResourcesAuthoritySnapshot() {
final SettableFuture<Map<XdsResourceType<?>, Map<String, String>>> future =
SettableFuture.create();
syncContext.execute(new Runnable() {

Check warning on line 252 in xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java#L251-L252

Added lines #L251 - L252 were not covered by tests
@Override
public void run() {
// A map from a "resource type" to a map ("resource name": "authority")
ImmutableMap.Builder<XdsResourceType<?>, Map<String, String>> authoritySnapshot =
ImmutableMap.builder();

Check warning on line 257 in xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java#L257

Added line #L257 was not covered by tests
for (XdsResourceType<?> resourceType : resourceSubscribers.keySet()) {
ImmutableMap.Builder<String, String> authorityMap = ImmutableMap.builder();

Check warning on line 259 in xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java#L259

Added line #L259 was not covered by tests
for (Map.Entry<String, ResourceSubscriber<? extends ResourceUpdate>> resourceEntry
: resourceSubscribers.get(resourceType).entrySet()) {
authorityMap.put(resourceEntry.getKey(), resourceEntry.getValue().authority);
}
authoritySnapshot.put(resourceType, authorityMap.buildOrThrow());
}
future.set(authoritySnapshot.buildOrThrow());
}

Check warning on line 267 in xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java#L262-L267

Added lines #L262 - L267 were not covered by tests
});
return future;

Check warning on line 269 in xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java#L269

Added line #L269 was not covered by tests
}

@Override
public Object getSecurityConfig() {
return securityConfig;
Expand Down
58 changes: 48 additions & 10 deletions xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
public class XdsClientMetricReporterImplTest {

private static final String target = "test-target";
private static final String authority = "test-authority";
private static final String server = "trafficdirector.googleapis.com";
private static final String resourceTypeUrl =
"resourceTypeUrl.googleapis.com/envoy.config.cluster.v3.Cluster";
Expand All @@ -101,7 +102,6 @@ public void setUp() {

@Test
public void reportResourceUpdates() {
// TODO(dnvindhya): add the "authority" label once available.
reporter.reportResourceUpdates(10, 5, server, resourceTypeUrl);
verify(mockMetricRecorder).addLongCounter(
eqMetricInstrumentName("grpc.xds_client.resource_updates_valid"), eq((long) 10),
Expand Down Expand Up @@ -129,6 +129,8 @@ public void setXdsClient_reportMetrics() throws Exception {
future.set(null);
when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()).thenReturn(Futures.immediateFuture(
ImmutableMap.of()));
when(mockXdsClient.getSubscribedResourcesAuthoritySnapshot())
.thenReturn(Futures.immediateFuture(ImmutableMap.of()));
when(mockXdsClient.reportServerConnections(any(ServerConnectionCallback.class)))
.thenReturn(future);
reporter.setXdsClient(mockXdsClient);
Expand All @@ -150,6 +152,8 @@ public void setXdsClient_reportCallbackMetrics_resourceCountsFails() {
future.set(null);
when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()).thenReturn(Futures.immediateFuture(
ImmutableMap.of()));
when(mockXdsClient.getSubscribedResourcesAuthoritySnapshot())
.thenReturn(Futures.immediateFuture(ImmutableMap.of()));

// Create a future that will throw an exception
SettableFuture<Void> serverConnectionsFeature = SettableFuture.create();
Expand Down Expand Up @@ -177,6 +181,8 @@ public void metricGauges() {
future.set(null);
when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()).thenReturn(Futures.immediateFuture(
ImmutableMap.of()));
when(mockXdsClient.getSubscribedResourcesAuthoritySnapshot())
.thenReturn(Futures.immediateFuture(ImmutableMap.of()));
when(mockXdsClient.reportServerConnections(any(ServerConnectionCallback.class)))
.thenReturn(future);
reporter.setXdsClient(mockXdsClient);
Expand All @@ -199,7 +205,7 @@ public void metricGauges() {

// Verify that reportResourceCounts and reportServerConnections were called
// with the captured callback
callback.reportResourceCountGauge(10, "acked", resourceTypeUrl);
callback.reportResourceCountGauge("PotatoHead", 10, "acked", resourceTypeUrl);
inOrder.verify(mockBatchRecorder)
.recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), eq(10L), any(),
any());
Expand All @@ -222,16 +228,17 @@ public void metricReporterCallback() {
eq(Lists.newArrayList()));

String cacheState = "requested";
callback.reportResourceCountGauge(10, cacheState, resourceTypeUrl);
callback.reportResourceCountGauge(authority, 10, cacheState, resourceTypeUrl);
verify(mockBatchRecorder, times(1)).recordLongGauge(
eqMetricInstrumentName("grpc.xds_client.resources"), eq(10L),
eq(Arrays.asList(target, cacheState, resourceTypeUrl)),
eq(Arrays.asList(target, authority, cacheState, resourceTypeUrl)),
eq(Collections.emptyList()));
}

@Test
public void reportCallbackMetrics_computeAndReportResourceCounts() {
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType = new HashMap<>();
Map<XdsResourceType<?>, Map<String, String>> authorityByType = new HashMap<>();
XdsResourceType<?> listenerResource = XdsListenerResource.getInstance();
XdsResourceType<?> routeConfigResource = XdsRouteConfigureResource.getInstance();
XdsResourceType<?> clusterResource = XdsClusterResource.getInstance();
Expand All @@ -241,31 +248,44 @@ public void reportCallbackMetrics_computeAndReportResourceCounts() {
long nanosLastUpdate = 1577923199_606042047L;

Map<String, ResourceMetadata> ldsResourceMetadataMap = new HashMap<>();
Map<String, String> ldsAuthorityMap = new HashMap<>();
ldsResourceMetadataMap.put("resource1",
ResourceMetadata.newResourceMetadataRequested());
ldsAuthorityMap.put("resource1", "authority1");
ResourceMetadata ackedLdsResource = ResourceMetadata.newResourceMetadataAcked(rawListener, "42",
nanosLastUpdate);
ldsResourceMetadataMap.put("resource2", ackedLdsResource);
ldsAuthorityMap.put("resource2", "authority2");
ldsResourceMetadataMap.put("resource3",
ResourceMetadata.newResourceMetadataAcked(rawListener, "43", nanosLastUpdate));
ldsAuthorityMap.put("resource3", "authority3");
ldsResourceMetadataMap.put("resource4",
ResourceMetadata.newResourceMetadataNacked(ackedLdsResource, "44", nanosLastUpdate,
"nacked after previous ack", true));
ldsAuthorityMap.put("resource4", "authority4");

Map<String, ResourceMetadata> rdsResourceMetadataMap = new HashMap<>();
Map<String, String> rdsAuthorityMap = new HashMap<>();
ResourceMetadata requestedRdsResourceMetadata = ResourceMetadata.newResourceMetadataRequested();
rdsResourceMetadataMap.put("resource5",
ResourceMetadata.newResourceMetadataNacked(requestedRdsResourceMetadata, "24",
nanosLastUpdate, "nacked after request", false));
rdsAuthorityMap.put("resource5", "authority5");
rdsResourceMetadataMap.put("resource6",
ResourceMetadata.newResourceMetadataDoesNotExist());
rdsAuthorityMap.put("resource6", "authority6");

Map<String, ResourceMetadata> cdsResourceMetadataMap = new HashMap<>();
Map<String, String> cdsAuthorityMap = new HashMap<>();
cdsResourceMetadataMap.put("resource7", ResourceMetadata.newResourceMetadataUnknown());
cdsAuthorityMap.put("resource7", "authority7");

metadataByType.put(listenerResource, ldsResourceMetadataMap);
authorityByType.put(listenerResource, ldsAuthorityMap);
metadataByType.put(routeConfigResource, rdsResourceMetadataMap);
authorityByType.put(routeConfigResource, rdsAuthorityMap);
metadataByType.put(clusterResource, cdsResourceMetadataMap);
authorityByType.put(clusterResource, cdsAuthorityMap);

SettableFuture<Void> reportServerConnectionsCompleted = SettableFuture.create();
reportServerConnectionsCompleted.set(null);
Expand All @@ -277,36 +297,49 @@ public void reportCallbackMetrics_computeAndReportResourceCounts() {
when(mockXdsClient.getSubscribedResourcesMetadataSnapshot())
.thenReturn(getResourceMetadataCompleted);

ListenableFuture<Map<XdsResourceType<?>, Map<String, String>>>
getResourceAuthorityCompleted = Futures.immediateFuture(authorityByType);
when(mockXdsClient.getSubscribedResourcesAuthoritySnapshot())
.thenReturn(getResourceAuthorityCompleted);

reporter.reportCallbackMetrics(mockBatchRecorder, mockXdsClient);

// LDS resource requested
verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"),
eq(1L), eq(Arrays.asList(target, "requested", listenerResource.typeUrl())), any());
eq(1L), eq(Arrays.asList(target, "authority1",
"requested", listenerResource.typeUrl())), any());
// LDS resources acked
verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"),
eq(2L), eq(Arrays.asList(target, "acked", listenerResource.typeUrl())), any());
eq(2L), eq(Arrays.asList(target, "authority3",
"acked", listenerResource.typeUrl())), any());
// LDS resource nacked but cached
verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"),
eq(1L), eq(Arrays.asList(target, "nacked_but_cached", listenerResource.typeUrl())), any());
eq(1L), eq(Arrays.asList(target, "authority4",
"nacked_but_cached", listenerResource.typeUrl())), any());

// RDS resource nacked
verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"),
eq(1L), eq(Arrays.asList(target, "nacked", routeConfigResource.typeUrl())), any());
eq(1L), eq(Arrays.asList(target, "authority5",
"nacked", routeConfigResource.typeUrl())), any());
// RDS resource does not exist
verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"),
eq(1L), eq(Arrays.asList(target, "does_not_exist", routeConfigResource.typeUrl())), any());
eq(1L), eq(Arrays.asList(target, "authority6",
"does_not_exist", routeConfigResource.typeUrl())), any());

// CDS resource unknown
verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"),
eq(1L), eq(Arrays.asList(target, "unknown", clusterResource.typeUrl())), any());
eq(1L), eq(Arrays.asList(target, "authority7",
"unknown", clusterResource.typeUrl())), any());
verifyNoMoreInteractions(mockBatchRecorder);
}

@Test
public void reportCallbackMetrics_computeAndReportResourceCounts_emptyResources() {
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType = new HashMap<>();
Map<XdsResourceType<?>, Map<String, String>> authorityByType = new HashMap<>();
XdsResourceType<?> listenerResource = XdsListenerResource.getInstance();
metadataByType.put(listenerResource, Collections.emptyMap());
authorityByType.put(listenerResource, Collections.emptyMap());

SettableFuture<Void> reportServerConnectionsCompleted = SettableFuture.create();
reportServerConnectionsCompleted.set(null);
Expand All @@ -318,6 +351,11 @@ public void reportCallbackMetrics_computeAndReportResourceCounts_emptyResources(
when(mockXdsClient.getSubscribedResourcesMetadataSnapshot())
.thenReturn(getResourceMetadataCompleted);

ListenableFuture<Map<XdsResourceType<?>, Map<String, String>>>
getAuthorityCompleted = Futures.immediateFuture(authorityByType);
when(mockXdsClient.getSubscribedResourcesAuthoritySnapshot())
.thenReturn(getAuthorityCompleted);

reporter.reportCallbackMetrics(mockBatchRecorder, mockXdsClient);

// Verify that reportResourceCountGauge is never called
Expand Down