Skip to content

Commit

Permalink
Massive, likely breaking (!): Implement test suite, various Keeper fi…
Browse files Browse the repository at this point in the history
…xes/refactoring
  • Loading branch information
yunginnanet committed Jun 19, 2024
1 parent e41e182 commit 217d4d2
Show file tree
Hide file tree
Showing 19 changed files with 566 additions and 140 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ jobs:
with:
go-version: '1.22'
- name: Water's fine
run: go test -race -v ./...
run: go database_test -race -v ./...
- name: Run coverage
run: go test -race -coverprofile=coverage.txt -covermode=atomic ./...
run: go database_test -race -coverprofile=coverage.txt -covermode=atomic ./...
- name: Upload coverage to Codecov
run: bash <(curl -s https://codecov.io/bash)
175 changes: 130 additions & 45 deletions bitcask/bitcask.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"path/filepath"
"strings"
"sync"
"sync/atomic"

"git.tcp.direct/Mirrors/bitcask-mirror"

"git.tcp.direct/tcp.direct/database"
"git.tcp.direct/tcp.direct/database/kv"
"git.tcp.direct/tcp.direct/database/metadata"
"git.tcp.direct/tcp.direct/database/models"
)
Expand All @@ -20,23 +22,43 @@ import (
type Store struct {
*bitcask.Bitcask
database.Searcher
closed bool
closed *atomic.Bool
}

// Get is a wrapper around the bitcask Get function for error regularization.
func (s *Store) Get(key []byte) ([]byte, error) {
if s.closed.Load() {
return nil, fs.ErrClosed
}
ret, err := s.Bitcask.Get(key)
err = kv.RegularizeKVError(key, ret, err)
return ret, err
}

// Close is a wrapper around the bitcask Close function.
func (s *Store) Close() error {
if s.closed.Load() {
return fs.ErrClosed
}
s.closed.Store(true)
return s.Bitcask.Close()
}

// Backend returns the underlying bitcask instance.
func (s Store) Backend() any {
func (s *Store) Backend() any {
return s.Bitcask
}

// DB is a mapper of a Filer and Searcher implementation using Bitcask.
type DB struct {
store map[string]Store
store map[string]*Store
path string
mu *sync.RWMutex
meta *metadata.Metadata
initialized bool
initialized *atomic.Bool
}

// Meta returns the [models.Metadata] implementation of the bitcask keeper.
func (db *DB) Meta() models.Metadata {
db.mu.RLock()
m := db.meta
Expand All @@ -60,12 +82,9 @@ func (db *DB) AllStores() map[string]database.Filer {

func (db *DB) init() error {
var err error
db.mu.RLock()
if db.initialized {
db.mu.RUnlock()
if db.initialized.Load() {
return nil
}
db.mu.RUnlock()
db.mu.Lock()
defer db.mu.Unlock()
if _, err = os.Stat(db.path); os.IsNotExist(err) {
Expand All @@ -86,7 +105,7 @@ func (db *DB) init() error {
if db.meta.Type() != db.Type() {
return fmt.Errorf("meta.json is not a bitcask meta file")
}
db.initialized = true
db.initialized.Store(true)
return nil
}

Expand All @@ -95,7 +114,7 @@ func (db *DB) init() error {
if err != nil {
return fmt.Errorf("error creating meta file: %w", err)
}
db.initialized = true
db.initialized.Store(true)
return nil
}

Expand All @@ -104,12 +123,14 @@ func (db *DB) init() error {

// OpenDB will either open an existing set of bitcask datastores at the given directory, or it will create a new one.
func OpenDB(path string) *DB {
ainit := &atomic.Bool{}
ainit.Store(false)
return &DB{
store: make(map[string]Store),
store: make(map[string]*Store),
path: path,
mu: &sync.RWMutex{},
meta: nil,
initialized: false,
initialized: ainit,
}
}

Expand All @@ -123,9 +144,8 @@ func (db *DB) Discover() ([]string, error) {
stores := make([]string, 0)
errs := make([]error, 0)
if db.store == nil {
db.store = make(map[string]Store)
db.store = make(map[string]*Store)
}
os.Stat(filepath.Join(db.path, "meta.json"))

entries, err := fs.ReadDir(os.DirFS(db.path), ".")
if err != nil {
Expand Down Expand Up @@ -157,17 +177,19 @@ func (db *DB) Discover() ([]string, error) {
}
println("WARN: bitcask store", name, "has bad metadata, attempting to repair")
oldMeta := filepath.Join(db.path, name, "meta.json")
newMeta := filepath.Join(db.path, name, "meta.json.backup")
println("WARN: renaming", oldMeta, "to", newMeta)
// likely defunct lockfile is present too, remove it
if osErr := os.Rename(oldMeta, newMeta); osErr != nil {
println("WARN: failed to rename", oldMeta, "to", newMeta, ":", osErr)
return
oldMetaBackup := filepath.Join(db.path, name, "meta.json.backup")
println("WARN: renaming", oldMeta, "to", oldMetaBackup)
if osErr := os.Rename(oldMeta, oldMetaBackup); osErr != nil {
println("Fatal: failed to rename", oldMeta, "to", oldMetaBackup, ":", osErr)
panic(osErr)
}

// likely defunct lockfile is present too, remove it
if _, serr := os.Stat(filepath.Join(db.path, name, "lock")); serr == nil {
println("WARN: removing defunct lockfile")
_ = os.Remove(filepath.Join(db.path, name, "lock"))
}

retry = true
})
if retry {
Expand All @@ -176,7 +198,9 @@ func (db *DB) Discover() ([]string, error) {
errs = append(errs, e)
continue
}
db.store[name] = Store{Bitcask: c}
aclosed := &atomic.Bool{}
aclosed.Store(false)
db.store[name] = &Store{Bitcask: c, closed: aclosed}
stores = append(stores, name)
}

Expand Down Expand Up @@ -225,44 +249,82 @@ func (db *DB) Init(storeName string, opts ...any) error {
}
var bitcaskopts []bitcask.Option
for _, opt := range opts {
if _, ok := opt.(bitcask.Option); !ok {
return errors.New("invalid bitcask option type")
_, isOptOK := opt.(bitcask.Option)
_, isOptsOK := opt.([]bitcask.Option)
if !isOptOK && !isOptsOK {
return fmt.Errorf("invalid bitcask option type (%T): %v", opt, opt)
}
if isOptOK {
bitcaskopts = append(bitcaskopts, opt.(bitcask.Option))
}
if isOptsOK {
bitcaskopts = append(bitcaskopts, opt.([]bitcask.Option)...)
}
bitcaskopts = append(bitcaskopts, opt.(bitcask.Option))
}
db.mu.Lock()
defer db.mu.Unlock()

if len(defaultBitcaskOptions) > 0 {
bitcaskopts = append(bitcaskopts, defaultBitcaskOptions...)
bitcaskopts = append(defaultBitcaskOptions, bitcaskopts...)
}

db.mu.Lock()
err := db.initStore(storeName, bitcaskopts...)
db.mu.Unlock()

return err
}

// initStore is a helper function to initialize a bitcask store, caller must hold keeper's lock.
func (db *DB) initStore(storeName string, opts ...bitcask.Option) error {
if _, ok := db.store[storeName]; ok {
return ErrStoreExists
}
path := db.Path()
if !strings.HasSuffix(db.Path(), "/") {
path = db.Path() + "/"
}
c, e := bitcask.Open(path+storeName, bitcaskopts...)

c, e := bitcask.Open(filepath.Join(db.Path(), storeName), opts...)
if e != nil {
return e
}
db.store[storeName] = Store{Bitcask: c}

aclosed := &atomic.Bool{}
aclosed.Store(false)
db.store[storeName] = &Store{Bitcask: c, closed: aclosed}
return nil
}

// Destroy will remove the bitcask store and all data associated with it.
func (db *DB) Destroy(storeName string) error {
db.mu.Lock()
defer db.mu.Unlock()
st, ok := db.store[storeName]
if !ok {
return ErrBogusStore
}
err := st.Close()
if err != nil {
return err
}
delete(db.store, storeName)
return os.RemoveAll(filepath.Join(db.path, storeName))
}

// With calls the given underlying bitcask instance.
func (db *DB) With(storeName string) database.Store {
if err := db.init(); err != nil {
panic(err)
}
db.mu.RLock()
defer db.mu.RUnlock()
d, ok := db.store[storeName]
if ok {
if ok && !d.closed.Load() {
db.mu.RUnlock()
return d
}
if ok && d.closed.Load() {
db.mu.RUnlock()
db.mu.Lock()
delete(db.store, storeName)
db.mu.Unlock()
return nil
}
db.mu.RUnlock()
return nil
}

Expand All @@ -271,42 +333,54 @@ func (db *DB) WithNew(storeName string, opts ...any) database.Filer {
if err := db.init(); err != nil {
panic(err)
}
db.mu.RLock()
defer db.mu.RUnlock()

newOpts := make([]bitcask.Option, 0)
for _, opt := range opts {
if _, ok := opt.(bitcask.Option); !ok {
fmt.Println("invalid bitcask option type: ", opt)
continue
}
defaultBitcaskOptions = append(defaultBitcaskOptions, opt.(bitcask.Option))
newOpts = append(newOpts, opt.(bitcask.Option))
}

db.mu.Lock()
defer db.mu.Unlock()

d, ok := db.store[storeName]
if ok {
if d.Bitcask == nil || d.closed == nil || d.closed.Load() {
delete(db.store, storeName)
if err := db.initStore(storeName, newOpts...); err != nil {
fmt.Println("error re-initializing bitcask store: ", err.Error())
}
return db.store[storeName]
}
return d
}
db.mu.RUnlock()
err := db.Init(storeName)
db.mu.RLock()

err := db.initStore(storeName, newOpts...)
if err != nil {
fmt.Println("error creating bitcask store: ", err)

}
return db.store[storeName]
}

// Close is a simple shim for bitcask's Close function.
func (db *DB) Close(storeName string) error {
db.mu.Lock()
defer db.mu.Unlock()
db.mu.RLock()
st, ok := db.store[storeName]
if !ok {
db.mu.RUnlock()
return ErrBogusStore
}
db.mu.RUnlock()
err := st.Close()
if err != nil {
return err
}
db.mu.Lock()
delete(db.store, storeName)
db.mu.Unlock()
return nil
}

Expand Down Expand Up @@ -334,7 +408,10 @@ const (
// In the case of an error, withAll will continue and return a compound form of any errors that occurred.
// For now this is just for Close and Sync, thusly it does a hard lock on the Keeper.
func (db *DB) withAll(action withAllAction) error {
if db == nil || db.store == nil || len(db.store) < 1 {
if db == nil || db.store == nil {
panic("bitcask: nil db or db.store")
}
if len(db.store) < 1 {
return ErrNoStores
}
var errs = make([]error, len(db.store))
Expand All @@ -344,13 +421,19 @@ func (db *DB) withAll(action withAllAction) error {
errs = append(errs, namedErr(name, ErrBogusStore))
continue
}
if store.closed.Load() {
continue
}
switch action {
case dclose:
db.mu.Lock()
closeErr := store.Close()
if errors.Is(closeErr, fs.ErrClosed) {
continue
}
err = namedErr(name, closeErr)
delete(db.store, name)
db.mu.Unlock()
case dsync:
err = namedErr(name, store.Sync())
default:
Expand Down Expand Up @@ -391,6 +474,8 @@ func (db *DB) SyncAll() error {
return db.withAll(dsync)
}

// Type returns the type of keeper, in this case "bitcask".
// This is in order to implement [database.Keeper].
func (db *DB) Type() string {
return "bitcask"
}
Loading

0 comments on commit 217d4d2

Please sign in to comment.