-
Notifications
You must be signed in to change notification settings - Fork 1.5k
feat(internal): implement Bigtable specific channel pool optimizations #13226
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
f8a1e85
47fcb2f
6faf794
91325f9
adef432
7b6928a
76c7300
90f4206
3e16e1f
0f773dd
794b450
dca6949
174de08
207f660
4582ca6
f6c006f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ import ( | |
| "fmt" | ||
| "hash/crc32" | ||
| "io" | ||
| "log" | ||
| "net/url" | ||
| "os" | ||
| "strconv" | ||
|
|
@@ -80,6 +81,7 @@ type Client struct { | |
| retryOption gax.CallOption | ||
| executeQueryRetryOption gax.CallOption | ||
| enableDirectAccess bool | ||
| logger *log.Logger | ||
| } | ||
|
|
||
| // ClientConfig has configurations for the client. | ||
|
|
@@ -94,6 +96,10 @@ type ClientConfig struct { | |
| // | ||
| // TODO: support user provided meter provider | ||
| MetricsProvider MetricsProvider | ||
|
|
||
| // Logger is the logger to use for this client. If it is nil, all logging | ||
| // will be directed to the standard logger. | ||
| Logger *log.Logger | ||
| } | ||
|
|
||
| // MetricsProvider is a wrapper for built in metrics meter provider | ||
|
|
@@ -175,9 +181,31 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C | |
| var connPoolErr error | ||
| enableBigtableConnPool := btopt.EnableBigtableConnectionPool() | ||
| if enableBigtableConnPool { | ||
| connPool, connPoolErr = btransport.NewBigtableChannelPool(defaultBigtableConnPoolSize, btopt.BigtableLoadBalancingStrategy(), func() (*grpc.ClientConn, error) { | ||
| return gtransport.Dial(ctx, o...) | ||
| }) | ||
| // Return *BigtableConn. | ||
| dialFunc := func() (*btransport.BigtableConn, error) { | ||
| // First, establish the underlying gRPC connection. | ||
| grpcConn, err := gtransport.Dial(ctx, o...) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| // Then, wrap the grpcConn in a BigtableConn, providing instance and app profile. | ||
| return btransport.NewBigtableConn(grpcConn, instance, config.AppProfile), nil | ||
| } | ||
|
|
||
| // Initialize the BigtableChannelPool with the updated dialFunc. | ||
| // btransport is assumed to be the package where BigtableChannelPool resides. | ||
| connPool, connPoolErr = btransport.NewBigtableChannelPool( | ||
| ctx, | ||
| defaultBigtableConnPoolSize, | ||
| btopt.BigtableLoadBalancingStrategy(), | ||
| dialFunc, | ||
| config.Logger, | ||
| nil, | ||
| btransport.WithHealthCheckConfig(btopt.DefaultHealthCheckConfig()), | ||
| btransport.WithDynamicChannelPool(btopt.DefaultDynamicChannelPoolConfig(defaultBigtableConnPoolSize)), | ||
| btransport.WithMetricsReporterConfig(btopt.DefaultMetricsReporterConfig()), | ||
| ) | ||
| connPool, connPoolErr = btransport.NewBigtableChannelPool(ctx, defaultBigtableConnPoolSize, btopt.BigtableLoadBalancingStrategy(), dialFunc, config.Logger, nil) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it duplicate? so should it be with dynamic channel pool, healthcheck, and metrics or not? |
||
| } else { | ||
| // use to regular ConnPool | ||
| connPool, connPoolErr = gtransport.DialPool(ctx, o...) | ||
|
|
@@ -198,6 +226,7 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C | |
| retryOption: retryOption, | ||
| executeQueryRetryOption: executeQueryRetryOption, | ||
| enableDirectAccess: enableDirectAccess, | ||
| logger: config.Logger, | ||
| }, nil | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,7 @@ | |
| import ( | ||
| "context" | ||
| "fmt" | ||
| "log" | ||
| "os" | ||
| "strconv" | ||
| "strings" | ||
|
|
@@ -201,3 +202,103 @@ | |
| } | ||
| return enableBigtableConnPool | ||
| } | ||
|
|
||
| // Logf logs the given message to the given logger, or the standard logger if | ||
| // the given logger is nil. | ||
| func logf(logger *log.Logger, format string, v ...interface{}) { | ||
| if logger == nil { | ||
| log.Printf(format, v...) | ||
| } else { | ||
| logger.Printf(format, v...) | ||
| } | ||
| } | ||
|
|
||
| var debug = os.Getenv("CBT_ENABLE_DEBUG") == "true" | ||
|
|
||
| // Debugf logs the given message *only if* the global Debug flag is true. | ||
| // It reuses Logf to handle the nil logger logic and prepends "DEBUG: " | ||
| // to the message. | ||
| func Debugf(logger *log.Logger, format string, v ...interface{}) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it is weird to see logging functionality in the |
||
| // Only log if the Debug flag is set | ||
| if debug { | ||
| // Prepend "DEBUG: " to the format string | ||
| debugFormat := "DEBUG: " + format | ||
| logf(logger, debugFormat, v...) | ||
| } | ||
| } | ||
|
|
||
| // HealthCheckConfig holds the parameters for channel pool health checking. | ||
| type HealthCheckConfig struct { | ||
| // Enabled for toggle | ||
| Enabled bool | ||
| // ProbeInterval is the interval at which channel health is probed. | ||
| ProbeInterval time.Duration | ||
| // ProbeTimeout is the deadline for each individual health check probe RPC. | ||
| ProbeTimeout time.Duration | ||
| // WindowDuration is the duration over which probe results are kept for health evaluation. | ||
| WindowDuration time.Duration | ||
| // MinProbesForEval is the minimum number of probes required before a channel's health is evaluated. | ||
| MinProbesForEval int | ||
| // FailurePercentThresh is the percentage of failed probes within the window duration | ||
| // that will cause a channel to be considered unhealthy. | ||
| FailurePercentThresh int | ||
| // PoolwideBadThreshPercent is the "circuit breaker" threshold. If this percentage | ||
| // of channels in the pool are unhealthy, no evictions will occur. | ||
| PoolwideBadThreshPercent int | ||
| // MinEvictionInterval is the minimum time that must pass between eviction of unhealthy channels. | ||
| MinEvictionInterval time.Duration | ||
| } | ||
|
|
||
| // DynamicChannelPoolConfig holds the parameters for dynamic channel pool scaling. | ||
| type DynamicChannelPoolConfig struct { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we exposing any of these for customers to tune? |
||
| Enabled bool // Whether dynamic scaling is enabled. | ||
| MinConns int // Minimum number of connections in the pool. | ||
| MaxConns int // Maximum number of connections in the pool. | ||
| AvgLoadHighThreshold int32 // Average weighted load per connection to trigger scale-up. | ||
| AvgLoadLowThreshold int32 // Average weighted load per connection to trigger scale-down. | ||
| MinScalingInterval time.Duration // Minimum time between scaling operations (both up and down). | ||
| CheckInterval time.Duration // How often to check if scaling is needed. | ||
| MaxRemoveConns int // Maximum number of connections to remove at once. | ||
| } | ||
|
|
||
| // DefaultDynamicChannelPoolConfig is default settings for dynamic channel pool | ||
| func DefaultDynamicChannelPoolConfig(initialConns int) DynamicChannelPoolConfig { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. initialConns not used? |
||
| return DynamicChannelPoolConfig{ | ||
| Enabled: true, // Enabled by default | ||
| MinConns: 10, | ||
| MaxConns: 200, | ||
| AvgLoadHighThreshold: 50, // Example thresholds, these likely need tuning | ||
| AvgLoadLowThreshold: 10, | ||
| MinScalingInterval: 1 * time.Minute, | ||
| CheckInterval: 30 * time.Second, | ||
| MaxRemoveConns: 2, // Cap for removals | ||
| } | ||
| } | ||
|
|
||
| // DefaultHealthCheckConfig for HealthCheckConfig | ||
| func DefaultHealthCheckConfig() HealthCheckConfig { | ||
| return HealthCheckConfig{ | ||
| Enabled: true, | ||
| ProbeInterval: 30 * time.Second, | ||
| ProbeTimeout: 1 * time.Second, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 1 second seems long. lets make this consistent with Java? the probe deadline is 500 ms |
||
| WindowDuration: 5 * time.Minute, | ||
| MinProbesForEval: 4, | ||
| FailurePercentThresh: 60, | ||
| PoolwideBadThreshPercent: 70, | ||
| MinEvictionInterval: 1 * time.Minute, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems short, maybe lets make it consistent with java as well which is 10 minutes ? |
||
| } | ||
| } | ||
|
|
||
| // MetricsReportingConfig for periodic reporting | ||
| // MetricsReporterConfig holds the parameters for metrics reporting. | ||
| type MetricsReporterConfig struct { | ||
| Enabled bool | ||
| ReportingInterval time.Duration | ||
| } | ||
|
|
||
| func DefaultMetricsReporterConfig() MetricsReporterConfig { | ||
| return MetricsReporterConfig{ | ||
| Enabled: true, | ||
| ReportingInterval: 1 * time.Minute, | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: is it possible to pass in default channel pool size in one place?