From c9da7d191c43494e2e101122324f862a37a3bbd8 Mon Sep 17 00:00:00 2001 From: "kayos@tcp.direct" Date: Fri, 21 Jun 2024 21:02:02 -0700 Subject: [PATCH] Feat: Migration --- backup/backup_test.go | 27 ++++ bitcask/bitcask_backup_test.go | 7 ++ keeper.go | 2 +- migrate/migrate.go | 223 +++++++++++++++++++++++++++++++++ migrate/migrate_test.go | 102 +++++++++++++++ mock.go | 202 +++++++++++++++++++++++++++++ 6 files changed, 562 insertions(+), 1 deletion(-) create mode 100644 bitcask/bitcask_backup_test.go create mode 100644 migrate/migrate.go create mode 100644 migrate/migrate_test.go create mode 100644 mock.go diff --git a/backup/backup_test.go b/backup/backup_test.go index c90c98d..843cbda 100644 --- a/backup/backup_test.go +++ b/backup/backup_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "git.tcp.direct/kayos/common/entropy" "github.com/davecgh/go-spew/spew" ) @@ -156,6 +157,14 @@ func TestTarGzBackup_Metadata(t *testing.T) { if !meta.Date.Equal(timestamp) { t.Errorf("expected timestamp %v, got %v", timestamp, meta.Date) } + + t.Run("json", func(t *testing.T) { + jDat, err := meta.MarshalJSON() + if err != nil { + t.Fatalf("error marshalling metadata: %v", err) + } + t.Logf("metadata json: %s", jDat) + }) } func TestRestoreTarGzBackup(t *testing.T) { @@ -204,3 +213,21 @@ func TestRestoreTarGzBackup(t *testing.T) { t.Errorf("expected file contents yeets2, got %s", tmp) } } + +func TestFailures(t *testing.T) { + if _, err := NewTarGzBackup(entropy.RandStrWithUpper(10), "asdf", nil); err == nil { + t.Error("expected error, got nil") + } + cwd, _ := os.Getwd() + if _, err := NewTarGzBackup(cwd, entropy.RandStrWithUpper(10), nil); err == nil { + t.Error("expected error, got nil") + } + if _, err := NewTarGzBackup(cwd, "/dev/null", nil); err == nil { + t.Error("expected error, got nil") + } + path := filepath.Join(t.TempDir(), "yeet.txt") + os.WriteFile(path, []byte("yeets"), 0644) + if _, err := NewTarGzBackup(path, t.TempDir(), nil); err == nil { + t.Error("expected error, got nil") + } +} diff --git a/bitcask/bitcask_backup_test.go b/bitcask/bitcask_backup_test.go new file mode 100644 index 0000000..eb807e8 --- /dev/null +++ b/bitcask/bitcask_backup_test.go @@ -0,0 +1,7 @@ +package bitcask + +import "testing" + +func TestBackupAll(t *testing.T) { + +} diff --git a/keeper.go b/keeper.go index 326c4fd..2f3331e 100644 --- a/keeper.go +++ b/keeper.go @@ -13,7 +13,7 @@ type Keeper interface { // Init should initialize our Filer at the given path, to be referenced and called by dataStore. Init(name string, options ...any) error // With provides access to the given dataStore by providing a pointer to the related Filer. - With(name string) Store + With(name string) Filer // WithNew should initialize a new Filer at the given path and return a pointer to it. WithNew(name string, options ...any) Filer diff --git a/migrate/migrate.go b/migrate/migrate.go new file mode 100644 index 0000000..138a818 --- /dev/null +++ b/migrate/migrate.go @@ -0,0 +1,223 @@ +// Package migrate implements the migration of data from one type of Keeper to another. +package migrate + +import ( + "context" + "errors" + "sync" + + "git.tcp.direct/tcp.direct/database" +) + +var ( + ErrNoStores = errors.New("no stores found in source keeper") + ErrDupKeys = errors.New( + "duplicate keys found in destination stores, enable skipping or clobbering of existing data to continue migration", + ) +) + +type ErrDuplicateKeys struct { + // map[store][]keys + Duplicates map[string][][]byte +} + +func (e ErrDuplicateKeys) Unwrap() error { + return ErrDupKeys +} + +func (e ErrDuplicateKeys) Error() string { + return "duplicate keys found in destination stores, enable skipping or clobbering of existing data to continue migration" +} + +func NewDuplicateKeysErr(duplicates map[string][][]byte) *ErrDuplicateKeys { + return &ErrDuplicateKeys{Duplicates: duplicates} +} + +type Migrator struct { + From database.Keeper + To database.Keeper + + duplicateKeys map[string]map[string]struct{} + + clobber bool + skipExisting bool + + mu sync.Mutex +} + +func mapMaptoMapSlice(m map[string]map[string]struct{}) map[string][][]byte { + out := make(map[string][][]byte) + for store, keys := range m { + for key := range keys { + out[store] = append(out[store], []byte(key)) + } + } + return out +} + +func NewMigrator(from, to database.Keeper) (*Migrator, error) { + if _, err := from.Discover(); err != nil { + return nil, err + } + if _, err := to.Discover(); err != nil { + return nil, err + } + return &Migrator{ + From: from, + To: to, + clobber: false, + skipExisting: false, + }, nil +} + +// WithClobber sets the clobber flag on the Migrator, allowing it to overwrite existing data in the destination Keeper. +func (m *Migrator) WithClobber() *Migrator { + m.mu.Lock() + m.clobber = true + m.mu.Unlock() + return m +} + +// WithSkipExisting sets the skipExisting flag on the Migrator, allowing it to skip existing data in the destination Keeper. +func (m *Migrator) WithSkipExisting() *Migrator { + m.mu.Lock() + m.skipExisting = true + m.mu.Unlock() + return m +} + +func (m *Migrator) CheckDupes() error { + fromStores := m.From.AllStores() + toStores := m.To.AllStores() + + if len(fromStores) == 0 { + return ErrNoStores + } + + if m.duplicateKeys == nil { + m.duplicateKeys = make(map[string]map[string]struct{}) + } + + wg := &sync.WaitGroup{} + + for storeName, store := range fromStores { + existingStore, ok := toStores[storeName] + if !ok { + continue + } + if existingStore.Len() == 0 { + continue + } + wg.Add(1) + go func(storeName string, store, existingStore database.Filer) { + defer wg.Done() + keys := existingStore.Keys() + for _, key := range keys { + if store.Has(key) { + m.mu.Lock() + if _, exists := m.duplicateKeys[storeName]; !exists { + m.duplicateKeys[storeName] = make(map[string]struct{}) + } + m.duplicateKeys[storeName][string(key)] = struct{}{} + m.mu.Unlock() + } + } + }(storeName, store, existingStore) + } + + wg.Wait() + + if len(m.duplicateKeys) == 0 || m.skipExisting || m.clobber { + return nil + } + + m.mu.Lock() + mslice := mapMaptoMapSlice(m.duplicateKeys) + m.mu.Unlock() + + return NewDuplicateKeysErr(mslice) +} + +func (m *Migrator) Migrate() error { + fromStores := m.From.AllStores() + + if len(fromStores) == 0 { + return ErrNoStores + } + + if err := m.CheckDupes(); err != nil { + return err + } + + m.mu.Lock() + defer m.mu.Unlock() + + errCh := make(chan error, len(fromStores)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wg := &sync.WaitGroup{} + for srcStoreName, srcStore := range fromStores { + if srcStore.Len() == 0 { + continue + } + wg.Add(1) + go func(storeName string, store database.Filer) { + defer wg.Done() + keys := store.Keys() + for _, key := range keys { + select { + case <-ctx.Done(): + return + default: + } + srcVal, err := m.From.With(storeName).Get(key) + if err != nil { + errCh <- err + return + } + if _, exists := m.duplicateKeys[storeName][string(key)]; exists { + if m.skipExisting { + continue + } + if !m.clobber { + errCh <- NewDuplicateKeysErr(mapMaptoMapSlice(m.duplicateKeys)) + return + } + if err = m.To.With(storeName).Put(key, srcVal); err != nil { + errCh <- err + return + } + continue + } + if err = m.To.WithNew(storeName).Put(key, srcVal); err != nil { + return + } + } + }(srcStoreName, srcStore) + } + + wgCh := make(chan struct{}) + + go func() { + wg.Wait() + close(wgCh) + }() + + select { + case <-wgCh: + case err := <-errCh: + return err + } + + fStores := m.From.AllStores() + tStores := m.To.AllStores() + + if len(fStores) != len(tStores) { + return errors.New("number of stores in source and destination keepers do not match") + } + + syncErrs := make([]error, 0, 2) + syncErrs = append(syncErrs, m.From.SyncAll(), m.To.SyncAll()) + return errors.Join(syncErrs...) +} diff --git a/migrate/migrate_test.go b/migrate/migrate_test.go new file mode 100644 index 0000000..3ee8f6c --- /dev/null +++ b/migrate/migrate_test.go @@ -0,0 +1,102 @@ +package migrate + +import ( + "errors" + "testing" + + "git.tcp.direct/tcp.direct/database" +) + +func TestMigrator_WithClobber(t *testing.T) { + from := database.NewMockKeeper("yeeeties") + to := database.NewMockKeeper("yooties") + + migrator, err := NewMigrator(from, to) + if err != nil { + t.Fatalf("error creating migrator: %v", err) + } + migrator = migrator.WithClobber() + + if !migrator.clobber { + t.Error("expected clobber to be true") + } +} + +func TestMigrator_WithSkipExisting(t *testing.T) { + from := database.NewMockKeeper("yeeeties") + to := database.NewMockKeeper("yooties") + + migrator, err := NewMigrator(from, to) + if err != nil { + t.Fatalf("error creating migrator: %v", err) + } + migrator = migrator.WithSkipExisting() + + if !migrator.skipExisting { + t.Error("expected skipExisting to be true") + } +} + +func TestMigrator_CheckDupes_NoStores(t *testing.T) { + from := database.NewMockKeeper("yeeeties") + to := database.NewMockKeeper("yooties") + + migrator, err := NewMigrator(from, to) + if err != nil { + t.Fatalf("error creating migrator: %v", err) + } + + err = migrator.CheckDupes() + + if !errors.Is(err, ErrNoStores) { + t.Error("expected ErrNoStores error") + } +} + +func TestMigrator_CheckDupes_DuplicateKeys(t *testing.T) { + from := database.NewMockKeeper("yeeeties") + to := database.NewMockKeeper("yooties") + + if err := from.WithNew("store1").Put([]byte("key1"), []byte("value1")); err != nil { + t.Fatalf("error putting key1: %v", err) + } + if err := to.WithNew("store1").Put([]byte("key1"), []byte("value1")); err != nil { + t.Fatalf("error putting key1: %v", err) + } + + migrator, err := NewMigrator(from, to) + if err != nil { + t.Fatalf("error creating migrator: %v", err) + } + + err = migrator.CheckDupes() + + if !errors.Is(err, ErrDupKeys) { + t.Error("expected ErrDuplicateKeys error") + } +} + +func TestMigrator_Success(t *testing.T) { + from := database.NewMockKeeper("yeeeties") + to := database.NewMockKeeper("yooties") + + if err := from.WithNew("store1").Put([]byte("key1"), []byte("value1")); err != nil { + t.Fatalf("error putting key1: %v", err) + } + + migrator, err := NewMigrator(from, to) + if err != nil { + t.Fatalf("error creating migrator: %v", err) + } + migrator = migrator.WithClobber() + + err = migrator.Migrate() + + if err != nil { + t.Errorf("expected no error, got %v", err) + } + + if !to.With("store1").Has([]byte("key1")) { + t.Error("expected key1 to be to destination keeper") + } +} diff --git a/mock.go b/mock.go new file mode 100644 index 0000000..145d4a9 --- /dev/null +++ b/mock.go @@ -0,0 +1,202 @@ +package database + +import ( + "errors" + "sync" + + "git.tcp.direct/tcp.direct/database/models" +) + +type MockFiler struct { + name string + values map[string][]byte + closed bool + mu sync.RWMutex +} + +func (m *MockFiler) Backend() any { + return m.values +} + +func (m *MockFiler) Has(key []byte) bool { + m.mu.RLock() + _, ok := m.values[string(key)] + m.mu.RUnlock() + return ok +} + +var ErrKeyNotFound = errors.New("key not found") + +func (m *MockFiler) Get(key []byte) ([]byte, error) { + if !m.Has(key) { + return nil, ErrKeyNotFound + } + m.mu.RLock() + val := m.values[string(key)] + m.mu.RUnlock() + return val, nil +} + +func (m *MockFiler) Put(key []byte, value []byte) error { + m.mu.Lock() + m.values[string(key)] = value + m.mu.Unlock() + return nil +} + +func (m *MockFiler) Delete(key []byte) error { + m.mu.Lock() + delete(m.values, string(key)) + m.mu.Unlock() + return nil +} + +func (m *MockFiler) Close() error { + m.mu.Lock() + m.closed = true + m.mu.Unlock() + return nil +} + +func (m *MockFiler) Sync() error { + return nil +} + +func (m *MockFiler) Keys() [][]byte { + m.mu.RLock() + k := make([][]byte, 0, len(m.values)) + for key := range m.values { + k = append(k, []byte(key)) + } + m.mu.RUnlock() + return k +} + +func (m *MockFiler) Len() int { + m.mu.RLock() + l := len(m.values) + m.mu.RUnlock() + return l +} + +type MockKeeper struct { + name string + path string + stores map[string]Filer + mu sync.RWMutex +} + +func NewMockKeeper(name string) *MockKeeper { + return &MockKeeper{ + name: name, + stores: make(map[string]Filer), + } +} + +func (m *MockKeeper) Path() string { + return m.path +} + +func (m *MockKeeper) Init(name string, options ...any) error { + m.mu.Lock() + m.stores[name] = &MockFiler{name: name, values: make(map[string][]byte)} + m.mu.Unlock() + return nil +} + +func (m *MockKeeper) With(name string) Filer { + m.mu.RLock() + s, ok := m.stores[name] + m.mu.RUnlock() + if !ok { + return nil + } + return s +} + +func (m *MockKeeper) WithNew(name string, options ...any) Filer { + m.mu.RLock() + existing, ok := m.stores[name] + m.mu.RUnlock() + if ok { + return existing + } + m.mu.Lock() + m.stores[name] = &MockFiler{name: name, values: make(map[string][]byte)} + m.mu.Unlock() + return m.stores[name] +} + +func (m *MockKeeper) Destroy(name string) error { + m.mu.RLock() + _, ok := m.stores[name] + m.mu.RUnlock() + if !ok { + return errors.New("store not found") + } + m.mu.Lock() + delete(m.stores, name) + m.mu.Unlock() + return nil +} + +func (m *MockKeeper) Discover() ([]string, error) { + m.mu.RLock() + names := make([]string, 0, len(m.stores)) + for name := range m.stores { + names = append(names, name) + } + m.mu.RUnlock() + return names, nil +} + +func (m *MockKeeper) AllStores() map[string]Filer { + m.mu.RLock() + stores := make(map[string]Filer, len(m.stores)) + for name, store := range m.stores { + stores[name] = store + } + m.mu.RUnlock() + return stores +} + +func (m *MockKeeper) BackupAll(archivePath string) (models.Backup, error) { + panic("not implemented") +} + +func (m *MockKeeper) RestoreAll(archivePath string) error { + panic("not implemented") +} + +func (m *MockKeeper) Meta() models.Metadata { + panic("not implemented") +} + +func (m *MockKeeper) Close(name string) error { + m.mu.RLock() + store, ok := m.stores[name] + m.mu.RUnlock() + if !ok { + return errors.New("store not found") + } + return store.Close() +} + +func (m *MockKeeper) CloseAll() error { + m.mu.RLock() + for _, store := range m.stores { + if err := store.Close(); err != nil { + return err + } + } + m.mu.RUnlock() + return nil +} + +func (m *MockKeeper) SyncAll() error { + return nil +} + +func (m *MockKeeper) SyncAndCloseAll() error { + return m.CloseAll() +}