Skip to content

Commit

Permalink
Merge pull request #1878 from josephschorr/read-replica-support
Browse files Browse the repository at this point in the history
Read replica support for Postgres and MySQL datastores
  • Loading branch information
josephschorr authored Jul 1, 2024
2 parents 5ee8e07 + 0656701 commit 9456ce0
Show file tree
Hide file tree
Showing 17 changed files with 1,230 additions and 73 deletions.
10 changes: 10 additions & 0 deletions internal/datastore/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,13 @@ func RedactAndLogSensitiveConnString(ctx context.Context, baseErr string, err er
log.Ctx(ctx).Trace().Msg(baseErr + ": " + filtered)
return fmt.Errorf("%s. To view details of this error (that may contain sensitive information), please run with --log-level=trace", baseErr)
}

// RevisionUnavailableError is returned when a revision is not available on a replica.
type RevisionUnavailableError struct {
error
}

// NewRevisionUnavailableError creates a new RevisionUnavailableError.
func NewRevisionUnavailableError(err error) error {
return RevisionUnavailableError{err}
}
16 changes: 15 additions & 1 deletion internal/datastore/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func SeparateContextWithTracing(ctx context.Context) context.Context {
//
// This is useful for datastores that do not want to close connections when a
// cancel or deadline occurs.
func NewSeparatingContextDatastoreProxy(d datastore.Datastore) datastore.Datastore {
func NewSeparatingContextDatastoreProxy(d datastore.Datastore) datastore.StrictReadDatastore {
return &ctxProxy{d}
}

Expand All @@ -47,6 +47,20 @@ func (p *ctxProxy) ReadWriteTx(
return p.delegate.ReadWriteTx(ctx, f, opts...)
}

func (p *ctxProxy) IsStrictReadModeEnabled() bool {
ds := p.delegate
unwrapped, ok := p.delegate.(datastore.UnwrappableDatastore)
if ok {
ds = unwrapped.Unwrap()
}

if srm, ok := ds.(datastore.StrictReadDatastore); ok {
return srm.IsStrictReadModeEnabled()
}

return false
}

func (p *ctxProxy) OptimizedRevision(ctx context.Context) (datastore.Revision, error) {
return p.delegate.OptimizedRevision(SeparateContextWithTracing(ctx))
}
Expand Down
62 changes: 44 additions & 18 deletions internal/datastore/mysql/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ const (
noLastInsertID = 0
seedingTimeout = 10 * time.Second

primaryInstanceID = -1

// https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html#error_er_lock_wait_timeout
errMysqlLockWaitTimeout = 1205

Expand Down Expand Up @@ -102,15 +104,30 @@ type sqlFilter interface {
// URI: [scheme://][user[:[password]]@]host[:port][/schema][?attribute1=value1&attribute2=value2...
// See https://dev.mysql.com/doc/refman/8.0/en/connecting-using-uri-or-key-value-pairs.html
func NewMySQLDatastore(ctx context.Context, uri string, options ...Option) (datastore.Datastore, error) {
ds, err := newMySQLDatastore(ctx, uri, options...)
ds, err := newMySQLDatastore(ctx, uri, primaryInstanceID, options...)
if err != nil {
return nil, err
}

return datastoreinternal.NewSeparatingContextDatastoreProxy(ds), nil
}

func NewReadOnlyMySQLDatastore(
ctx context.Context,
url string,
index uint32,
options ...Option,
) (datastore.ReadOnlyDatastore, error) {
ds, err := newMySQLDatastore(ctx, url, int(index), options...)
if err != nil {
return nil, err
}

return datastoreinternal.NewSeparatingContextDatastoreProxy(ds), nil
}

func newMySQLDatastore(ctx context.Context, uri string, options ...Option) (*Datastore, error) {
func newMySQLDatastore(ctx context.Context, uri string, replicaIndex int, options ...Option) (*Datastore, error) {
isPrimary := replicaIndex == primaryInstanceID
config, err := generateConfig(options)
if err != nil {
return nil, fmt.Errorf(errUnableToInstantiate, err)
Expand Down Expand Up @@ -162,14 +179,21 @@ func newMySQLDatastore(ctx context.Context, uri string, options ...Option) (*Dat
return nil, common.RedactAndLogSensitiveConnString(ctx, "NewMySQLDatastore: unable to instrument connector", err, uri)
}

dbName := "spicedb"
if replicaIndex != primaryInstanceID {
dbName = fmt.Sprintf("spicedb_replica_%d", replicaIndex)
}

db = sql.OpenDB(connector)
collector := sqlstats.NewStatsCollector("spicedb", db)
collector := sqlstats.NewStatsCollector(dbName, db)
if err := prometheus.Register(collector); err != nil {
return nil, fmt.Errorf(errUnableToInstantiate, err)
}

if err := common.RegisterGCMetrics(); err != nil {
return nil, fmt.Errorf(errUnableToInstantiate, err)
if isPrimary {
if err := common.RegisterGCMetrics(); err != nil {
return nil, fmt.Errorf(errUnableToInstantiate, err)
}
}
} else {
db = sql.OpenDB(connector)
Expand Down Expand Up @@ -256,19 +280,21 @@ func newMySQLDatastore(ctx context.Context, uri string, options ...Option) (*Dat
}

// Start a goroutine for garbage collection.
if store.gcInterval > 0*time.Minute && config.gcEnabled {
store.gcGroup, store.gcCtx = errgroup.WithContext(store.gcCtx)
store.gcGroup.Go(func() error {
return common.StartGarbageCollector(
store.gcCtx,
store,
store.gcInterval,
store.gcWindow,
store.gcTimeout,
)
})
} else {
log.Warn().Msg("datastore background garbage collection disabled")
if isPrimary {
if store.gcInterval > 0*time.Minute && config.gcEnabled {
store.gcGroup, store.gcCtx = errgroup.WithContext(store.gcCtx)
store.gcGroup.Go(func() error {
return common.StartGarbageCollector(
store.gcCtx,
store,
store.gcInterval,
store.gcWindow,
store.gcTimeout,
)
})
} else {
log.Warn().Msg("datastore background garbage collection disabled")
}
}

return store, nil
Expand Down
5 changes: 3 additions & 2 deletions internal/datastore/mysql/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type datastoreTester struct {
func (dst *datastoreTester) createDatastore(revisionQuantization, gcInterval, gcWindow time.Duration, _ uint16) (datastore.Datastore, error) {
ctx := context.Background()
ds := dst.b.NewDatastore(dst.t, func(engine, uri string) datastore.Datastore {
ds, err := newMySQLDatastore(ctx, uri,
ds, err := newMySQLDatastore(ctx, uri, primaryInstanceID,
RevisionQuantization(revisionQuantization),
GCWindow(gcWindow),
GCInterval(gcInterval),
Expand Down Expand Up @@ -82,7 +82,7 @@ func createDatastoreTest(b testdatastore.RunningEngineForTest, tf datastoreTestF
return func(t *testing.T) {
ctx := context.Background()
ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore {
ds, err := newMySQLDatastore(ctx, uri, options...)
ds, err := newMySQLDatastore(ctx, uri, primaryInstanceID, options...)
require.NoError(t, err)
return ds
})
Expand Down Expand Up @@ -568,6 +568,7 @@ func QuantizedRevisionTest(t *testing.T, b testdatastore.RunningEngineForTest) {
ds, err := newMySQLDatastore(
ctx,
uri,
primaryInstanceID,
RevisionQuantization(5*time.Second),
GCWindow(24*time.Hour),
WatchBufferLength(1),
Expand Down
12 changes: 12 additions & 0 deletions internal/datastore/postgres/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type postgresOptions struct {
enablePrometheusStats bool
analyzeBeforeStatistics bool
gcEnabled bool
readStrictMode bool

migrationPhase string

Expand Down Expand Up @@ -61,6 +62,7 @@ const (
defaultMaxRetries = 10
defaultGCEnabled = true
defaultCredentialsProviderName = ""
defaultReadStrictMode = false
)

// Option provides the facility to configure how clients within the
Expand All @@ -80,6 +82,7 @@ func generateConfig(options []Option) (postgresOptions, error) {
maxRetries: defaultMaxRetries,
gcEnabled: defaultGCEnabled,
credentialsProviderName: defaultCredentialsProviderName,
readStrictMode: defaultReadStrictMode,
queryInterceptor: nil,
}

Expand All @@ -103,6 +106,15 @@ func generateConfig(options []Option) (postgresOptions, error) {
return computed, nil
}

// ReadStrictMode sets whether strict mode is used for reads in the Postgres reader. If enabled,
// an assertion is added into the WHERE clause of all read queries to ensure that the revision
// being read is available on the read connection.
//
// Strict mode is disabled by default, as the default behavior is to read from the primary.
func ReadStrictMode(readStrictMode bool) Option {
return func(po *postgresOptions) { po.readStrictMode = readStrictMode }
}

// ReadConnHealthCheckInterval is the frequency at which both idle and max
// lifetime connections are checked, and also the frequency at which the
// minimum number of connections is checked.
Expand Down
Loading

0 comments on commit 9456ce0

Please sign in to comment.