Skip to content
Open
35 changes: 32 additions & 3 deletions bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"hash/crc32"
"io"
"log"
"net/url"
"os"
"strconv"
Expand Down Expand Up @@ -80,6 +81,7 @@ type Client struct {
retryOption gax.CallOption
executeQueryRetryOption gax.CallOption
enableDirectAccess bool
logger *log.Logger
}

// ClientConfig has configurations for the client.
Expand All @@ -94,6 +96,10 @@ type ClientConfig struct {
//
// TODO: support user provided meter provider
MetricsProvider MetricsProvider

// Logger is the logger to use for this client. If it is nil, all logging
// will be directed to the standard logger.
Logger *log.Logger
}

// MetricsProvider is a wrapper for built in metrics meter provider
Expand Down Expand Up @@ -175,9 +181,31 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
var connPoolErr error
enableBigtableConnPool := btopt.EnableBigtableConnectionPool()
if enableBigtableConnPool {
connPool, connPoolErr = btransport.NewBigtableChannelPool(defaultBigtableConnPoolSize, btopt.BigtableLoadBalancingStrategy(), func() (*grpc.ClientConn, error) {
return gtransport.Dial(ctx, o...)
})
// Return *BigtableConn.
dialFunc := func() (*btransport.BigtableConn, error) {
// First, establish the underlying gRPC connection.
grpcConn, err := gtransport.Dial(ctx, o...)
if err != nil {
return nil, err
}
// Then, wrap the grpcConn in a BigtableConn, providing instance and app profile.
return btransport.NewBigtableConn(grpcConn, instance, config.AppProfile), nil
}

// Initialize the BigtableChannelPool with the updated dialFunc.
// btransport is assumed to be the package where BigtableChannelPool resides.
connPool, connPoolErr = btransport.NewBigtableChannelPool(
ctx,
defaultBigtableConnPoolSize,
btopt.BigtableLoadBalancingStrategy(),
dialFunc,
config.Logger,
nil,
btransport.WithHealthCheckConfig(btopt.DefaultHealthCheckConfig()),
btransport.WithDynamicChannelPool(btopt.DefaultDynamicChannelPoolConfig(defaultBigtableConnPoolSize)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is it possible to pass in default channel pool size in one place?

btransport.WithMetricsReporterConfig(btopt.DefaultMetricsReporterConfig()),
)
connPool, connPoolErr = btransport.NewBigtableChannelPool(ctx, defaultBigtableConnPoolSize, btopt.BigtableLoadBalancingStrategy(), dialFunc, config.Logger, nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it duplicate? so should it be with dynamic channel pool, healthcheck, and metrics or not?

} else {
// use to regular ConnPool
connPool, connPoolErr = gtransport.DialPool(ctx, o...)
Expand All @@ -198,6 +226,7 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
retryOption: retryOption,
executeQueryRetryOption: executeQueryRetryOption,
enableDirectAccess: enableDirectAccess,
logger: config.Logger,
}, nil
}

Expand Down
101 changes: 101 additions & 0 deletions bigtable/internal/option/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import (
"context"
"fmt"
"log"
"os"
"strconv"
"strings"
Expand Down Expand Up @@ -201,3 +202,103 @@
}
return enableBigtableConnPool
}

// Logf logs the given message to the given logger, or the standard logger if
// the given logger is nil.
func logf(logger *log.Logger, format string, v ...interface{}) {
if logger == nil {
log.Printf(format, v...)
} else {
logger.Printf(format, v...)
}
}

var debug = os.Getenv("CBT_ENABLE_DEBUG") == "true"

// Debugf logs the given message *only if* the global Debug flag is true.
// It reuses Logf to handle the nil logger logic and prepends "DEBUG: "
// to the message.
func Debugf(logger *log.Logger, format string, v ...interface{}) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is weird to see logging functionality in the option package. More like util or maybe just include in the connpool file? Also, should not be exported in that case.

// Only log if the Debug flag is set
if debug {
// Prepend "DEBUG: " to the format string
debugFormat := "DEBUG: " + format
logf(logger, debugFormat, v...)
}
}

// HealthCheckConfig holds the parameters for channel pool health checking.
type HealthCheckConfig struct {
// Enabled for toggle
Enabled bool
// ProbeInterval is the interval at which channel health is probed.
ProbeInterval time.Duration
// ProbeTimeout is the deadline for each individual health check probe RPC.
ProbeTimeout time.Duration
// WindowDuration is the duration over which probe results are kept for health evaluation.
WindowDuration time.Duration
// MinProbesForEval is the minimum number of probes required before a channel's health is evaluated.
MinProbesForEval int
// FailurePercentThresh is the percentage of failed probes within the window duration
// that will cause a channel to be considered unhealthy.
FailurePercentThresh int
// PoolwideBadThreshPercent is the "circuit breaker" threshold. If this percentage
// of channels in the pool are unhealthy, no evictions will occur.
PoolwideBadThreshPercent int
// MinEvictionInterval is the minimum time that must pass between eviction of unhealthy channels.
MinEvictionInterval time.Duration
}

// DynamicChannelPoolConfig holds the parameters for dynamic channel pool scaling.
type DynamicChannelPoolConfig struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we exposing any of these for customers to tune?

Enabled bool // Whether dynamic scaling is enabled.
MinConns int // Minimum number of connections in the pool.
MaxConns int // Maximum number of connections in the pool.
AvgLoadHighThreshold int32 // Average weighted load per connection to trigger scale-up.
AvgLoadLowThreshold int32 // Average weighted load per connection to trigger scale-down.
MinScalingInterval time.Duration // Minimum time between scaling operations (both up and down).
CheckInterval time.Duration // How often to check if scaling is needed.
MaxRemoveConns int // Maximum number of connections to remove at once.
}

// DefaultDynamicChannelPoolConfig is default settings for dynamic channel pool
func DefaultDynamicChannelPoolConfig(initialConns int) DynamicChannelPoolConfig {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initialConns not used?

return DynamicChannelPoolConfig{
Enabled: true, // Enabled by default
MinConns: 10,
MaxConns: 200,
AvgLoadHighThreshold: 50, // Example thresholds, these likely need tuning
AvgLoadLowThreshold: 10,
MinScalingInterval: 1 * time.Minute,
CheckInterval: 30 * time.Second,
MaxRemoveConns: 2, // Cap for removals
}
}

// DefaultHealthCheckConfig for HealthCheckConfig
func DefaultHealthCheckConfig() HealthCheckConfig {
return HealthCheckConfig{
Enabled: true,
ProbeInterval: 30 * time.Second,
ProbeTimeout: 1 * time.Second,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 second seems long. lets make this consistent with Java? the probe deadline is 500 ms

WindowDuration: 5 * time.Minute,
MinProbesForEval: 4,
FailurePercentThresh: 60,
PoolwideBadThreshPercent: 70,
MinEvictionInterval: 1 * time.Minute,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems short, maybe lets make it consistent with java as well which is 10 minutes ?

}
}

// MetricsReportingConfig for periodic reporting

Check failure on line 292 in bigtable/internal/option/option.go

View workflow job for this annotation

GitHub Actions / vet

comment on exported type MetricsReporterConfig should be of the form "MetricsReporterConfig ..." (with optional leading article)
// MetricsReporterConfig holds the parameters for metrics reporting.
type MetricsReporterConfig struct {
Enabled bool
ReportingInterval time.Duration
}

func DefaultMetricsReporterConfig() MetricsReporterConfig {

Check failure on line 299 in bigtable/internal/option/option.go

View workflow job for this annotation

GitHub Actions / vet

exported function DefaultMetricsReporterConfig should have comment or be unexported
return MetricsReporterConfig{
Enabled: true,
ReportingInterval: 1 * time.Minute,
}
}
Loading
Loading