Skip to content

Commit

Permalink
Multi-Shoveler Open and Close (#131)
Browse files Browse the repository at this point in the history
* start up multiple reflectors
* Concurrent ldb downloads
* Add rotating reader and tests
* add convenience methods
* use go 1.13 friendly atomic interface
* add support for multiple LDB copies in InitContainer
* only create dbs if they don't exist
* reopen and close last rotated reader
* create a changelog for first ldb in multimode
  • Loading branch information
wavetylor authored Nov 30, 2023
1 parent 4839bef commit f6fdf2c
Show file tree
Hide file tree
Showing 14 changed files with 840 additions and 75 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/segmentio/go-sqlite3 v1.12.0
github.com/segmentio/stats/v4 v4.6.2
github.com/stretchr/testify v1.8.1
golang.org/x/sync v0.3.0
)

require (
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
12 changes: 8 additions & 4 deletions ldb_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
// across multiple processes.
type LDBReader struct {
Db *sql.DB
path string
pkCache map[string]schema.PrimaryKey // keyed by ldbTableName()
getRowByKeyStmtCache map[string]*sql.Stmt // keyed by ldbTableName()
getRowsByKeyPrefixStmtCache map[prefixCacheKey]*sql.Stmt
Expand All @@ -46,13 +47,17 @@ var (
ErrNoLedgerUpdates = errors.New("no ledger updates have been received yet")
)

type RowRetriever interface {
GetRowsByKeyPrefix(ctx context.Context, familyName string, tableName string, key ...interface{}) (*Rows, error)
GetRowByKey(ctx context.Context, out interface{}, familyName string, tableName string, key ...interface{}) (found bool, err error)
}

func newLDBReader(path string) (*LDBReader, error) {
db, err := newLDB(path)
if err != nil {
return nil, err
}

return &LDBReader{Db: db}, nil
return &LDBReader{Db: db, path: path}, nil
}

func newVersionedLDBReader(dirPath string) (*LDBReader, error) {
Expand Down Expand Up @@ -344,7 +349,6 @@ func (reader *LDBReader) closeDB() error {
if reader.Db != nil {
return reader.Db.Close()
}

return nil
}

Expand All @@ -369,7 +373,7 @@ func (reader *LDBReader) Ping(ctx context.Context) bool {
// to the type of each PK column.
func convertKeyBeforeQuery(pk schema.PrimaryKey, key []interface{}) error {
for i, k := range key {
// sanity check on th elength of the pk field type slice
// sanity check on the length of the pk field type slice
if i >= len(pk.Types) {
return errors.New("insufficient key field type data")
}
Expand Down
168 changes: 168 additions & 0 deletions ldb_rotating_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package ctlstore

import (
"context"
"errors"
"fmt"
"github.com/segmentio/ctlstore/pkg/errs"
"github.com/segmentio/ctlstore/pkg/globalstats"
"github.com/segmentio/ctlstore/pkg/ldb"
"github.com/segmentio/events/v2"
"github.com/segmentio/stats/v4"
"path"
"strconv"
"sync/atomic"
"time"
)

// LDBRotatingReader reads data from multiple LDBs on a rotating schedule.
// The main benefit is relieving read pressure on a particular ldb file when it becomes inactive,
// allowing sqlite maintenance
type LDBRotatingReader struct {
active int32
dbs []*LDBReader
schedule []int8
now func() time.Time
tickerInterval time.Duration
}

// RotationPeriod how many minutes each reader is active for before rotating to the next
type RotationPeriod int

const (
// Every30 rotate on 30 minute mark in an hour
Every30 RotationPeriod = 30
// Every20 rotate on 20 minute marks in an hour
Every20 RotationPeriod = 20
// Every15 rotate on 15 minute marks in an hour
Every15 RotationPeriod = 15
// Every10 rotate on 10 minute marks in an hour
Every10 RotationPeriod = 10
// Every6 rotate on 6 minute marks in an hour
Every6 RotationPeriod = 6

// for simpler migration, the first ldb retains the original name
defaultPath = DefaultCtlstorePath + ldb.DefaultLDBFilename
ldbFormat = DefaultCtlstorePath + "ldb-%d.db"
)

func defaultPaths(count int) []string {
paths := []string{defaultPath}
for i := 1; i < count; i++ {
paths = append(paths, fmt.Sprintf(ldbFormat, i+1))
}
return paths
}

// RotatingReader creates a new reader that rotates which ldb it reads from on a rotation period with the default location in /var/spool/ctlstore
func RotatingReader(ctx context.Context, minutesPerRotation RotationPeriod, ldbsCount int) (RowRetriever, error) {
return CustomerRotatingReader(ctx, minutesPerRotation, defaultPaths(ldbsCount)...)
}

// CustomerRotatingReader creates a new reader that rotates which ldb it reads from on a rotation period
func CustomerRotatingReader(ctx context.Context, minutesPerRotation RotationPeriod, ldbPaths ...string) (RowRetriever, error) {
r, err := rotatingReader(minutesPerRotation, ldbPaths...)
if err != nil {
return nil, err
}
r.setActive()
go r.rotate(ctx)
return r, nil
}

func rotatingReader(minutesPerRotation RotationPeriod, ldbPaths ...string) (*LDBRotatingReader, error) {
if len(ldbPaths) < 2 {
return nil, errors.New("RotatingReader requires more than 1 ldb")
}
if !isValid(minutesPerRotation) {
return nil, errors.New(fmt.Sprintf("invalid rotation period: %v", minutesPerRotation))
}
if len(ldbPaths) > 60/int(minutesPerRotation) {
return nil, errors.New("cannot have more ldbs than rotations per hour")
}
var r LDBRotatingReader
for _, p := range ldbPaths {
events.Log("Opening ldb %s for reading", p)
reader, err := newLDBReader(p)
if err != nil {
return nil, err
}
r.dbs = append(r.dbs, reader)
}
r.schedule = make([]int8, 60)
idx := 0
for i := 1; i < 61; i++ {
r.schedule[i-1] = int8(idx % len(ldbPaths))
if i%int(minutesPerRotation) == 0 {
idx++
}
}
return &r, nil
}

func (r *LDBRotatingReader) setActive() {
if r.now == nil {
r.now = time.Now
}
atomic.StoreInt32(&r.active, int32(r.schedule[r.now().Minute()]))
}

// GetRowsByKeyPrefix delegates to the active LDBReader
func (r *LDBRotatingReader) GetRowsByKeyPrefix(ctx context.Context, familyName string, tableName string, key ...interface{}) (*Rows, error) {
return r.dbs[atomic.LoadInt32(&r.active)].GetRowsByKeyPrefix(ctx, familyName, tableName, key...)
}

// GetRowByKey delegates to the active LDBReader
func (r *LDBRotatingReader) GetRowByKey(ctx context.Context, out interface{}, familyName string, tableName string, key ...interface{}) (found bool, err error) {
return r.dbs[atomic.LoadInt32(&r.active)].GetRowByKey(ctx, out, familyName, tableName, key...)
}

// rotate by default checks every 1 minute if the active db has changed according to schedule
func (r *LDBRotatingReader) rotate(ctx context.Context) {
if r.tickerInterval == 0 {
r.tickerInterval = 1 * time.Minute
}
ticker := time.NewTicker(r.tickerInterval)

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
next := r.schedule[r.now().Minute()]
last := atomic.LoadInt32(&r.active)

// move the next to active and close and reopen the last one
if int32(next) != last {
atomic.StoreInt32(&r.active, int32(next))
stats.Incr("rotating_reader.rotate")
globalstats.Set("rotating_reader.active", next)
err := r.dbs[last].Close()
if err != nil {
events.Log("failed to close LDBReader for %s on rotation: %{error}v", r.dbs[last].path, err)
errs.Incr("rotating_reader.closing_ldbreader", stats.T("id", strconv.Itoa(int(last))))
return
}

reader, err := newLDBReader(r.dbs[last].path)
if err != nil {
events.Log("failed to open LDBReader for %s on rotation: %{error}v", r.dbs[last].path, err)
errs.Incr("rotating_reader.opening_ldbreader",
stats.T("id", strconv.Itoa(int(last))),
stats.T("path", path.Base(r.dbs[last].path)))
return
}
r.dbs[last] = reader

}
}
}
}

func isValid(rf RotationPeriod) bool {
switch rf {
case Every6, Every10, Every15, Every20, Every30:
return true
}
return false
}
Loading

0 comments on commit f6fdf2c

Please sign in to comment.