Skip to content

Commit

Permalink
Feat (registry, loader): Set options when loading / using registered …
Browse files Browse the repository at this point in the history
…keepers

TODO: also need a mutex for default bitcask opt synchronization
  • Loading branch information
yunginnanet committed Jul 9, 2024
1 parent 672b3b9 commit b84c4a9
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 29 deletions.
5 changes: 4 additions & 1 deletion bitcask/bitcask.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"sync"
"sync/atomic"

"git.tcp.direct/Mirrors/bitcask-mirror"
"git.mills.io/prologic/bitcask"

"git.tcp.direct/tcp.direct/database"
"git.tcp.direct/tcp.direct/database/kv"
Expand Down Expand Up @@ -227,6 +227,9 @@ func (db *DB) discover(force ...bool) ([]string, error) {
aclosed.Store(false)
db.store[name] = &Store{Bitcask: c, closed: aclosed}
if db.meta == nil {
// TODO: verify this:
// bitcask should store it's config in each store's individual metadata files (whereas pogreb doesn't seem to)
// this means we don't need to put the configs into our metadata, which is fortunate because the config is unexported
db.meta = metadata.NewMeta("bitcask")
}
if db.meta.KnownStores == nil {
Expand Down
27 changes: 26 additions & 1 deletion bitcask/reg.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,37 @@
package bitcask

import (
"errors"

"git.mills.io/prologic/bitcask"

"git.tcp.direct/tcp.direct/database"
"git.tcp.direct/tcp.direct/database/registry"
)

var ErrBadOpt = errors.New("invalid bitcask options")

func init() {
registry.RegisterKeeper("bitcask", func(path string) (database.Keeper, error) {
registry.RegisterKeeper("bitcask", func(path string, opt ...any) (database.Keeper, error) {
bitcaskOptions := make([]bitcask.Option, 0, len(opt))
for _, o := range opt {
var casted bitcask.Option
switch v := o.(type) {
case bitcask.Option:
casted = v
case *bitcask.Option:
casted = *v
case []bitcask.Option:
bitcaskOptions = append(bitcaskOptions, v...)
continue
default:
return nil, ErrBadOpt
}
bitcaskOptions = append(bitcaskOptions, casted)
}
if len(bitcaskOptions) > 0 {
defaultBitcaskOptions = append(defaultBitcaskOptions, bitcaskOptions...)
}
db := OpenDB(path)
err := db.init()
return db, err
Expand Down
4 changes: 2 additions & 2 deletions loader/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"git.tcp.direct/tcp.direct/database/registry"
)

func OpenKeeper(path string) (database.Keeper, error) {
func OpenKeeper(path string, opts ...any) (database.Keeper, error) {
stat, statErr := os.Stat(path)
if statErr != nil {
return nil, statErr
Expand Down Expand Up @@ -41,7 +41,7 @@ func OpenKeeper(path string) (database.Keeper, error) {
if keeperCreator = registry.GetKeeper(meta.KeeperType); keeperCreator == nil {
return nil, fmt.Errorf("keeper type %s not found in registry", meta.KeeperType)
}
keeper, err := keeperCreator(path)
keeper, err := keeperCreator(path, meta.DefStoreOpts)
if err != nil {
return nil, fmt.Errorf("error substantiating keeper: %w", err)
}
Expand Down
14 changes: 11 additions & 3 deletions pogreb/opt.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pogreb
import (
"encoding/json"
"errors"
"sync"

"github.com/akrylysov/pogreb"
)
Expand All @@ -24,9 +25,9 @@ func SetPogrebOptions(options pogreb.Options) Option {
}

type WrappedOptions struct {
*pogreb.Options
*pogreb.Options `json:"options"`
// AllowRecovery allows the database to be recovered if a lockfile is detected upon running Init.
AllowRecovery bool
AllowRecovery bool `json:"allow_recovery,omitempty"`
}

func (w *WrappedOptions) MarshalJSON() ([]byte, error) {
Expand All @@ -48,6 +49,8 @@ var defaultPogrebOptions = &WrappedOptions{
AllowRecovery: false,
}

var defOptMu = sync.RWMutex{}

var ErrInvalidOptions = errors.New("invalid pogreb options")

func castOptions(pogrebopts ...any) (*WrappedOptions, error) {
Expand Down Expand Up @@ -81,13 +84,18 @@ func castOptions(pogrebopts ...any) (*WrappedOptions, error) {

// SetDefaultPogrebOptions options will set the options used for all subsequent pogreb stores that are initialized.
func SetDefaultPogrebOptions(pogrebopts ...any) (err error) {
defOptMu.Lock()
defaultPogrebOptions, err = castOptions(pogrebopts...)
defOptMu.Unlock()
return
}

func normalizeOptions(opts ...any) *WrappedOptions {
if len(opts) == 0 {
return defaultPogrebOptions
defOptMu.RLock()
defOpt := defaultPogrebOptions
defOptMu.RUnlock()
return defOpt
}

opt, err := castOptions(opts[0])
Expand Down
50 changes: 31 additions & 19 deletions pogreb/pogreb.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,16 @@ func (db *DB) _init() error {
}

if errors.Is(err, os.ErrNotExist) {
db.meta, err = metadata.NewMetaFile(db.Type(), filepath.Join(db.path, "meta.json"))
dest := filepath.Join(db.path, "meta.json")
f, ferr := os.Create(dest)
if ferr != nil {
return fmt.Errorf("error creating meta file: %w", ferr)
}
defOptMu.RLock()
db.meta = metadata.NewMeta(metadata.KeeperType(db.Type())).
WithDefaultStoreOpts(defaultPogrebOptions).WithWriter(f)
defOptMu.RUnlock()
err = db.meta.Sync()
if err != nil {
return fmt.Errorf("error creating meta file: %w", err)
}
Expand Down Expand Up @@ -287,7 +296,9 @@ func (db *DB) Init(storeName string, opts ...any) error {
if err := db.init(); err != nil {
return err
}
defOptMu.RLock()
pogrebopts := defaultPogrebOptions
defOptMu.RUnlock()
if len(opts) > 0 {
pogrebopts = normalizeOptions(opts...)
if pogrebopts == nil {
Expand All @@ -312,26 +323,22 @@ func (db *DB) With(storeName string) database.Filer {
db.mu.RUnlock()
return nil
}
if ok {
if d.closed == nil || d.DB == nil || d.closed.Load() {
db.mu.RUnlock()
db.mu.Lock()
delete(db.store, storeName)
if err := db.initStore(storeName, defaultPogrebOptions); err != nil {
_, _ = os.Stderr.WriteString("error creating pogreb store: " + err.Error())
db.mu.Unlock()
return nil
}
if d.closed == nil || d.DB == nil || d.closed.Load() {
db.mu.RUnlock()
db.mu.Lock()
delete(db.store, storeName)
defOptMu.RLock()
if err := db.initStore(storeName, defaultPogrebOptions); err != nil {
_, _ = os.Stderr.WriteString("error creating pogreb store: " + err.Error())
defOptMu.RUnlock()
db.mu.Unlock()
return db.store[storeName]
return nil
}
db.mu.RUnlock()
return d
defOptMu.RUnlock()
db.mu.Unlock()
return db.store[storeName]
}
db.mu.RUnlock()
if d != nil && d.DB == nil {
d = nil
}
return d
}

Expand Down Expand Up @@ -551,14 +558,19 @@ func (db *DB) discover(force ...bool) ([]string, error) {
if _, ok := db.store[name]; ok {
continue
}
if err = db.initStore(name, defaultPogrebOptions); err != nil {
defOptMu.RLock()
defOpt := defaultPogrebOptions
defOptMu.RUnlock()
if err = db.initStore(name, defOpt); err != nil {
errs = append(errs, err)
continue
}
stores = append(stores, name)
aclosed := &atomic.Bool{}
aclosed.Store(false)
db.store[name] = &Store{DB: db.store[name].DB, closed: aclosed, opts: defaultPogrebOptions}
defOptMu.RLock()
db.store[name] = &Store{DB: db.store[name].DB, closed: aclosed, opts: defOpt}
defOptMu.RUnlock()
}

for _, e := range errs {
Expand Down
14 changes: 13 additions & 1 deletion pogreb/reg.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,19 @@ import (
)

func init() {
registry.RegisterKeeper("pogreb", func(path string) (database.Keeper, error) {
registry.RegisterKeeper("pogreb", func(path string, opts ...any) (database.Keeper, error) {
if len(opts) > 1 {
return nil, ErrInvalidOptions
}
if len(opts) == 1 {
casted, castErr := castOptions(opts...)
if castErr != nil {
return nil, castErr
}
defOptMu.Lock()
defaultPogrebOptions = casted
defOptMu.Unlock()
}
db := OpenDB(path)
err := db.init()
return db, err
Expand Down
2 changes: 1 addition & 1 deletion test/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (m *MockKeeper) WriteMeta(path string) error {
mockMu.Lock()
mockKeepers[m.name] = m.AllStores()
mockMu.Unlock()
registry.RegisterKeeper(m.name, func(path string) (database.Keeper, error) {
registry.RegisterKeeper(m.name, func(path string, _ ...any) (database.Keeper, error) {
return NewMockKeeper(m.name), nil
})

Expand Down
2 changes: 1 addition & 1 deletion util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package database

import "errors"

type KeeperCreator func(path string) (Keeper, error)
type KeeperCreator func(path string, opt ...any) (Keeper, error)

var ErrNotStore = errors.New("provided Filer does not implement Store")

Expand Down

0 comments on commit b84c4a9

Please sign in to comment.