Skip to content

Commit

Permalink
xds: Add support for persistent filter cache
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiitk committed Feb 6, 2025
1 parent 44e92e2 commit 0705b6d
Show file tree
Hide file tree
Showing 18 changed files with 613 additions and 411 deletions.
170 changes: 91 additions & 79 deletions xds/src/main/java/io/grpc/xds/FaultFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import io.grpc.internal.GrpcUtil;
import io.grpc.xds.FaultConfig.FaultAbort;
import io.grpc.xds.FaultConfig.FaultDelay;
import io.grpc.xds.Filter.ClientInterceptorBuilder;
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
import java.util.Locale;
import java.util.concurrent.Executor;
Expand All @@ -56,10 +55,11 @@
import javax.annotation.Nullable;

/** HttpFault filter implementation. */
final class FaultFilter implements Filter, ClientInterceptorBuilder {
final class FaultFilter implements Filter {

static final FaultFilter INSTANCE =
private static final FaultFilter INSTANCE =
new FaultFilter(ThreadSafeRandomImpl.instance, new AtomicLong());

@VisibleForTesting
static final Metadata.Key<String> HEADER_DELAY_KEY =
Metadata.Key.of("x-envoy-fault-delay-request", Metadata.ASCII_STRING_MARSHALLER);
Expand Down Expand Up @@ -87,96 +87,108 @@ final class FaultFilter implements Filter, ClientInterceptorBuilder {
this.activeFaultCounter = activeFaultCounter;
}

@Override
public String[] typeUrls() {
return new String[] { TYPE_URL };
}

@Override
public ConfigOrError<FaultConfig> parseFilterConfig(Message rawProtoMessage) {
HTTPFault httpFaultProto;
if (!(rawProtoMessage instanceof Any)) {
return ConfigOrError.fromError("Invalid config type: " + rawProtoMessage.getClass());
static final class Provider implements Filter.Provider {
@Override
public String[] typeUrls() {
return new String[]{TYPE_URL};
}
Any anyMessage = (Any) rawProtoMessage;
try {
httpFaultProto = anyMessage.unpack(HTTPFault.class);
} catch (InvalidProtocolBufferException e) {
return ConfigOrError.fromError("Invalid proto: " + e);

@Override
public boolean isClientFilter() {
return true;
}
return parseHttpFault(httpFaultProto);
}

private static ConfigOrError<FaultConfig> parseHttpFault(HTTPFault httpFault) {
FaultDelay faultDelay = null;
FaultAbort faultAbort = null;
if (httpFault.hasDelay()) {
faultDelay = parseFaultDelay(httpFault.getDelay());
@Override
public FaultFilter newInstance() {
return INSTANCE;

Check warning on line 103 in xds/src/main/java/io/grpc/xds/FaultFilter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/FaultFilter.java#L103

Added line #L103 was not covered by tests
}
if (httpFault.hasAbort()) {
ConfigOrError<FaultAbort> faultAbortOrError = parseFaultAbort(httpFault.getAbort());
if (faultAbortOrError.errorDetail != null) {
return ConfigOrError.fromError(
"HttpFault contains invalid FaultAbort: " + faultAbortOrError.errorDetail);

@Override
public ConfigOrError<FaultConfig> parseFilterConfig(Message rawProtoMessage) {
HTTPFault httpFaultProto;
if (!(rawProtoMessage instanceof Any)) {
return ConfigOrError.fromError("Invalid config type: " + rawProtoMessage.getClass());

Check warning on line 110 in xds/src/main/java/io/grpc/xds/FaultFilter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/FaultFilter.java#L110

Added line #L110 was not covered by tests
}
faultAbort = faultAbortOrError.config;
}
Integer maxActiveFaults = null;
if (httpFault.hasMaxActiveFaults()) {
maxActiveFaults = httpFault.getMaxActiveFaults().getValue();
if (maxActiveFaults < 0) {
maxActiveFaults = Integer.MAX_VALUE;
Any anyMessage = (Any) rawProtoMessage;
try {
httpFaultProto = anyMessage.unpack(HTTPFault.class);
} catch (InvalidProtocolBufferException e) {
return ConfigOrError.fromError("Invalid proto: " + e);

Check warning on line 116 in xds/src/main/java/io/grpc/xds/FaultFilter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/FaultFilter.java#L115-L116

Added lines #L115 - L116 were not covered by tests
}
return parseHttpFault(httpFaultProto);
}
return ConfigOrError.fromConfig(FaultConfig.create(faultDelay, faultAbort, maxActiveFaults));
}

private static FaultDelay parseFaultDelay(
io.envoyproxy.envoy.extensions.filters.common.fault.v3.FaultDelay faultDelay) {
FaultConfig.FractionalPercent percent = parsePercent(faultDelay.getPercentage());
if (faultDelay.hasHeaderDelay()) {
return FaultDelay.forHeader(percent);
@Override
public ConfigOrError<FaultConfig> parseFilterConfigOverride(Message rawProtoMessage) {
return parseFilterConfig(rawProtoMessage);
}
return FaultDelay.forFixedDelay(Durations.toNanos(faultDelay.getFixedDelay()), percent);
}

@VisibleForTesting
static ConfigOrError<FaultAbort> parseFaultAbort(
io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort faultAbort) {
FaultConfig.FractionalPercent percent = parsePercent(faultAbort.getPercentage());
switch (faultAbort.getErrorTypeCase()) {
case HEADER_ABORT:
return ConfigOrError.fromConfig(FaultAbort.forHeader(percent));
case HTTP_STATUS:
return ConfigOrError.fromConfig(FaultAbort.forStatus(
GrpcUtil.httpStatusToGrpcStatus(faultAbort.getHttpStatus()), percent));
case GRPC_STATUS:
return ConfigOrError.fromConfig(FaultAbort.forStatus(
Status.fromCodeValue(faultAbort.getGrpcStatus()), percent));
case ERRORTYPE_NOT_SET:
default:
return ConfigOrError.fromError(
"Unknown error type case: " + faultAbort.getErrorTypeCase());
private static ConfigOrError<FaultConfig> parseHttpFault(HTTPFault httpFault) {
FaultDelay faultDelay = null;
FaultAbort faultAbort = null;
if (httpFault.hasDelay()) {
faultDelay = parseFaultDelay(httpFault.getDelay());
}
if (httpFault.hasAbort()) {
ConfigOrError<FaultAbort> faultAbortOrError = parseFaultAbort(httpFault.getAbort());
if (faultAbortOrError.errorDetail != null) {
return ConfigOrError.fromError(

Check warning on line 135 in xds/src/main/java/io/grpc/xds/FaultFilter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/FaultFilter.java#L135

Added line #L135 was not covered by tests
"HttpFault contains invalid FaultAbort: " + faultAbortOrError.errorDetail);
}
faultAbort = faultAbortOrError.config;
}
Integer maxActiveFaults = null;
if (httpFault.hasMaxActiveFaults()) {
maxActiveFaults = httpFault.getMaxActiveFaults().getValue();
if (maxActiveFaults < 0) {
maxActiveFaults = Integer.MAX_VALUE;

Check warning on line 144 in xds/src/main/java/io/grpc/xds/FaultFilter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/FaultFilter.java#L144

Added line #L144 was not covered by tests
}
}
return ConfigOrError.fromConfig(FaultConfig.create(faultDelay, faultAbort, maxActiveFaults));
}
}

private static FaultConfig.FractionalPercent parsePercent(FractionalPercent proto) {
switch (proto.getDenominator()) {
case HUNDRED:
return FaultConfig.FractionalPercent.perHundred(proto.getNumerator());
case TEN_THOUSAND:
return FaultConfig.FractionalPercent.perTenThousand(proto.getNumerator());
case MILLION:
return FaultConfig.FractionalPercent.perMillion(proto.getNumerator());
case UNRECOGNIZED:
default:
throw new IllegalArgumentException("Unknown denominator type: " + proto.getDenominator());
private static FaultDelay parseFaultDelay(
io.envoyproxy.envoy.extensions.filters.common.fault.v3.FaultDelay faultDelay) {
FaultConfig.FractionalPercent percent = parsePercent(faultDelay.getPercentage());
if (faultDelay.hasHeaderDelay()) {
return FaultDelay.forHeader(percent);

Check warning on line 154 in xds/src/main/java/io/grpc/xds/FaultFilter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/FaultFilter.java#L154

Added line #L154 was not covered by tests
}
return FaultDelay.forFixedDelay(Durations.toNanos(faultDelay.getFixedDelay()), percent);
}
}

@Override
public ConfigOrError<FaultConfig> parseFilterConfigOverride(Message rawProtoMessage) {
return parseFilterConfig(rawProtoMessage);
@VisibleForTesting
static ConfigOrError<FaultAbort> parseFaultAbort(
io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort faultAbort) {
FaultConfig.FractionalPercent percent = parsePercent(faultAbort.getPercentage());
switch (faultAbort.getErrorTypeCase()) {
case HEADER_ABORT:
return ConfigOrError.fromConfig(FaultAbort.forHeader(percent));
case HTTP_STATUS:
return ConfigOrError.fromConfig(FaultAbort.forStatus(
GrpcUtil.httpStatusToGrpcStatus(faultAbort.getHttpStatus()), percent));
case GRPC_STATUS:
return ConfigOrError.fromConfig(FaultAbort.forStatus(
Status.fromCodeValue(faultAbort.getGrpcStatus()), percent));
case ERRORTYPE_NOT_SET:
default:
return ConfigOrError.fromError(
"Unknown error type case: " + faultAbort.getErrorTypeCase());

Check warning on line 175 in xds/src/main/java/io/grpc/xds/FaultFilter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/FaultFilter.java#L174-L175

Added lines #L174 - L175 were not covered by tests
}
}

private static FaultConfig.FractionalPercent parsePercent(FractionalPercent proto) {
switch (proto.getDenominator()) {
case HUNDRED:
return FaultConfig.FractionalPercent.perHundred(proto.getNumerator());
case TEN_THOUSAND:
return FaultConfig.FractionalPercent.perTenThousand(proto.getNumerator());
case MILLION:
return FaultConfig.FractionalPercent.perMillion(proto.getNumerator());
case UNRECOGNIZED:
default:
throw new IllegalArgumentException("Unknown denominator type: " + proto.getDenominator());

Check warning on line 189 in xds/src/main/java/io/grpc/xds/FaultFilter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/FaultFilter.java#L189

Added line #L189 was not covered by tests
}
}
}

@Nullable
Expand Down
97 changes: 68 additions & 29 deletions xds/src/main/java/io/grpc/xds/Filter.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,53 +20,92 @@
import com.google.protobuf.Message;
import io.grpc.ClientInterceptor;
import io.grpc.ServerInterceptor;
import java.io.Closeable;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;

/**
* Defines the parsing functionality of an HTTP filter. A Filter may optionally implement either
* {@link ClientInterceptorBuilder} or {@link ServerInterceptorBuilder} or both, indicating it is
* capable of working on the client side or server side or both, respectively.
* Defines the parsing functionality of an HTTP filter.
*
* <p>A Filter may optionally implement either {@link Filter#buildClientInterceptor} or
* {@link Filter#buildServerInterceptor} or both, and return true from corresponding
* {@link Provider#isClientFilter()}, {@link Provider#isServerFilter()} to indicate that the filter
* is capable of working on the client side or server side or both, respectively.
*/
interface Filter {
interface Filter extends Closeable {

/**
* The proto message types supported by this filter. A filter will be registered by each of its
* supported message types.
*/
String[] typeUrls();
/** Represents an opaque data structure holding configuration for a filter. */
interface FilterConfig {
String typeUrl();
}

/**
* Parses the top-level filter config from raw proto message. The message may be either a {@link
* com.google.protobuf.Any} or a {@link com.google.protobuf.Struct}.
* Common interface for filter providers.
*/
ConfigOrError<? extends FilterConfig> parseFilterConfig(Message rawProtoMessage);
interface Provider {
/**
* The proto message types supported by this filter. A filter will be registered by each of its
* supported message types.
*/
String[] typeUrls();

/**
* Parses the per-filter override filter config from raw proto message. The message may be either
* a {@link com.google.protobuf.Any} or a {@link com.google.protobuf.Struct}.
*/
ConfigOrError<? extends FilterConfig> parseFilterConfigOverride(Message rawProtoMessage);
/**
* Whether the filter can be installed on the client side.
*
* <p>Return true if the filter implements {@link Filter#buildClientInterceptor}.
*/
default boolean isClientFilter() {
return false;
}

/** Represents an opaque data structure holding configuration for a filter. */
interface FilterConfig {
String typeUrl();
/**
* Whether the filter can be installed into xDS-enabled servers.
*
* <p>Return true if the filter implements {@link Filter#buildServerInterceptor}.
*/
default boolean isServerFilter() {
return false;
}

/**
* Creates a new instance of the filter.
*
* <p>TODO(sergiitk): [IMPL] better doc.
*/
Filter newInstance();

/**
* Parses the top-level filter config from raw proto message. The message may be either a {@link
* com.google.protobuf.Any} or a {@link com.google.protobuf.Struct}.
*/
ConfigOrError<? extends FilterConfig> parseFilterConfig(Message rawProtoMessage);

/**
* Parses the per-filter override filter config from raw proto message. The message may be
* either a {@link com.google.protobuf.Any} or a {@link com.google.protobuf.Struct}.
*/
ConfigOrError<? extends FilterConfig> parseFilterConfigOverride(Message rawProtoMessage);
}

/** Uses the FilterConfigs produced above to produce an HTTP filter interceptor for clients. */
interface ClientInterceptorBuilder {
@Nullable
ClientInterceptor buildClientInterceptor(
FilterConfig config, @Nullable FilterConfig overrideConfig,
ScheduledExecutorService scheduler);
@Nullable
default ClientInterceptor buildClientInterceptor(
FilterConfig config, @Nullable FilterConfig overrideConfig,
ScheduledExecutorService scheduler) {
return null;
}

/** Uses the FilterConfigs produced above to produce an HTTP filter interceptor for the server. */
interface ServerInterceptorBuilder {
@Nullable
ServerInterceptor buildServerInterceptor(
FilterConfig config, @Nullable FilterConfig overrideConfig);
@Nullable
default ServerInterceptor buildServerInterceptor(
FilterConfig config, @Nullable FilterConfig overrideConfig) {
return null;
}

@Override
default void close() {
// Optional cleanup on filter shutdown.
}

/** Filter config with instance name. */
Expand Down
16 changes: 8 additions & 8 deletions xds/src/main/java/io/grpc/xds/FilterRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,21 @@

/**
* A registry for all supported {@link Filter}s. Filters can be queried from the registry
* by any of the {@link Filter#typeUrls() type URLs}.
* by any of the {@link Filter.Provider#typeUrls() type URLs}.
*/
final class FilterRegistry {
private static FilterRegistry instance;

private final Map<String, Filter> supportedFilters = new HashMap<>();
private final Map<String, Filter.Provider> supportedFilters = new HashMap<>();

private FilterRegistry() {}

static synchronized FilterRegistry getDefaultRegistry() {
if (instance == null) {
instance = newRegistry().register(
FaultFilter.INSTANCE,
RouterFilter.INSTANCE,
RbacFilter.INSTANCE);
new FaultFilter.Provider(),
new RouterFilter.Provider(),
new RbacFilter.Provider());
}
return instance;
}
Expand All @@ -48,8 +48,8 @@ static FilterRegistry newRegistry() {
}

@VisibleForTesting
FilterRegistry register(Filter... filters) {
for (Filter filter : filters) {
FilterRegistry register(Filter.Provider... filters) {
for (Filter.Provider filter : filters) {
for (String typeUrl : filter.typeUrls()) {
supportedFilters.put(typeUrl, filter);
}
Expand All @@ -58,7 +58,7 @@ FilterRegistry register(Filter... filters) {
}

@Nullable
Filter get(String typeUrl) {
Filter.Provider get(String typeUrl) {
return supportedFilters.get(typeUrl);
}
}
Loading

0 comments on commit 0705b6d

Please sign in to comment.