Skip to content

Commit

Permalink
Feat: Migration
Browse files Browse the repository at this point in the history
  • Loading branch information
yunginnanet committed Jun 22, 2024
1 parent 0f67d6e commit c9da7d1
Show file tree
Hide file tree
Showing 6 changed files with 562 additions and 1 deletion.
27 changes: 27 additions & 0 deletions backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"git.tcp.direct/kayos/common/entropy"
"github.com/davecgh/go-spew/spew"
)

Expand Down Expand Up @@ -156,6 +157,14 @@ func TestTarGzBackup_Metadata(t *testing.T) {
if !meta.Date.Equal(timestamp) {
t.Errorf("expected timestamp %v, got %v", timestamp, meta.Date)
}

t.Run("json", func(t *testing.T) {
jDat, err := meta.MarshalJSON()
if err != nil {
t.Fatalf("error marshalling metadata: %v", err)
}
t.Logf("metadata json: %s", jDat)
})
}

func TestRestoreTarGzBackup(t *testing.T) {
Expand Down Expand Up @@ -204,3 +213,21 @@ func TestRestoreTarGzBackup(t *testing.T) {
t.Errorf("expected file contents yeets2, got %s", tmp)
}
}

func TestFailures(t *testing.T) {
if _, err := NewTarGzBackup(entropy.RandStrWithUpper(10), "asdf", nil); err == nil {
t.Error("expected error, got nil")
}
cwd, _ := os.Getwd()
if _, err := NewTarGzBackup(cwd, entropy.RandStrWithUpper(10), nil); err == nil {
t.Error("expected error, got nil")
}
if _, err := NewTarGzBackup(cwd, "/dev/null", nil); err == nil {
t.Error("expected error, got nil")
}
path := filepath.Join(t.TempDir(), "yeet.txt")
os.WriteFile(path, []byte("yeets"), 0644)
if _, err := NewTarGzBackup(path, t.TempDir(), nil); err == nil {
t.Error("expected error, got nil")
}
}
7 changes: 7 additions & 0 deletions bitcask/bitcask_backup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package bitcask

import "testing"

func TestBackupAll(t *testing.T) {

}
2 changes: 1 addition & 1 deletion keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type Keeper interface {
// Init should initialize our Filer at the given path, to be referenced and called by dataStore.
Init(name string, options ...any) error
// With provides access to the given dataStore by providing a pointer to the related Filer.
With(name string) Store
With(name string) Filer
// WithNew should initialize a new Filer at the given path and return a pointer to it.
WithNew(name string, options ...any) Filer

Expand Down
223 changes: 223 additions & 0 deletions migrate/migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
// Package migrate implements the migration of data from one type of Keeper to another.
package migrate

import (
"context"
"errors"
"sync"

"git.tcp.direct/tcp.direct/database"
)

var (
ErrNoStores = errors.New("no stores found in source keeper")
ErrDupKeys = errors.New(
"duplicate keys found in destination stores, enable skipping or clobbering of existing data to continue migration",
)
)

type ErrDuplicateKeys struct {
// map[store][]keys
Duplicates map[string][][]byte
}

func (e ErrDuplicateKeys) Unwrap() error {
return ErrDupKeys
}

func (e ErrDuplicateKeys) Error() string {
return "duplicate keys found in destination stores, enable skipping or clobbering of existing data to continue migration"
}

func NewDuplicateKeysErr(duplicates map[string][][]byte) *ErrDuplicateKeys {
return &ErrDuplicateKeys{Duplicates: duplicates}
}

type Migrator struct {
From database.Keeper
To database.Keeper

duplicateKeys map[string]map[string]struct{}

clobber bool
skipExisting bool

mu sync.Mutex
}

func mapMaptoMapSlice(m map[string]map[string]struct{}) map[string][][]byte {
out := make(map[string][][]byte)
for store, keys := range m {
for key := range keys {
out[store] = append(out[store], []byte(key))
}
}
return out
}

func NewMigrator(from, to database.Keeper) (*Migrator, error) {
if _, err := from.Discover(); err != nil {
return nil, err
}
if _, err := to.Discover(); err != nil {
return nil, err
}
return &Migrator{
From: from,
To: to,
clobber: false,
skipExisting: false,
}, nil
}

// WithClobber sets the clobber flag on the Migrator, allowing it to overwrite existing data in the destination Keeper.
func (m *Migrator) WithClobber() *Migrator {
m.mu.Lock()
m.clobber = true
m.mu.Unlock()
return m
}

// WithSkipExisting sets the skipExisting flag on the Migrator, allowing it to skip existing data in the destination Keeper.
func (m *Migrator) WithSkipExisting() *Migrator {
m.mu.Lock()
m.skipExisting = true
m.mu.Unlock()
return m
}

func (m *Migrator) CheckDupes() error {
fromStores := m.From.AllStores()
toStores := m.To.AllStores()

if len(fromStores) == 0 {
return ErrNoStores
}

if m.duplicateKeys == nil {
m.duplicateKeys = make(map[string]map[string]struct{})
}

wg := &sync.WaitGroup{}

for storeName, store := range fromStores {
existingStore, ok := toStores[storeName]
if !ok {
continue
}
if existingStore.Len() == 0 {
continue
}
wg.Add(1)
go func(storeName string, store, existingStore database.Filer) {
defer wg.Done()
keys := existingStore.Keys()
for _, key := range keys {
if store.Has(key) {
m.mu.Lock()
if _, exists := m.duplicateKeys[storeName]; !exists {
m.duplicateKeys[storeName] = make(map[string]struct{})
}
m.duplicateKeys[storeName][string(key)] = struct{}{}
m.mu.Unlock()
}
}
}(storeName, store, existingStore)
}

wg.Wait()

if len(m.duplicateKeys) == 0 || m.skipExisting || m.clobber {
return nil
}

m.mu.Lock()
mslice := mapMaptoMapSlice(m.duplicateKeys)
m.mu.Unlock()

return NewDuplicateKeysErr(mslice)
}

func (m *Migrator) Migrate() error {
fromStores := m.From.AllStores()

if len(fromStores) == 0 {
return ErrNoStores
}

if err := m.CheckDupes(); err != nil {
return err
}

m.mu.Lock()
defer m.mu.Unlock()

errCh := make(chan error, len(fromStores))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

wg := &sync.WaitGroup{}
for srcStoreName, srcStore := range fromStores {
if srcStore.Len() == 0 {
continue
}
wg.Add(1)
go func(storeName string, store database.Filer) {
defer wg.Done()
keys := store.Keys()
for _, key := range keys {
select {
case <-ctx.Done():
return
default:
}
srcVal, err := m.From.With(storeName).Get(key)
if err != nil {
errCh <- err
return
}
if _, exists := m.duplicateKeys[storeName][string(key)]; exists {
if m.skipExisting {
continue
}
if !m.clobber {
errCh <- NewDuplicateKeysErr(mapMaptoMapSlice(m.duplicateKeys))
return
}
if err = m.To.With(storeName).Put(key, srcVal); err != nil {
errCh <- err
return
}
continue
}
if err = m.To.WithNew(storeName).Put(key, srcVal); err != nil {
return
}
}
}(srcStoreName, srcStore)
}

wgCh := make(chan struct{})

go func() {
wg.Wait()
close(wgCh)
}()

select {
case <-wgCh:
case err := <-errCh:
return err
}

fStores := m.From.AllStores()
tStores := m.To.AllStores()

if len(fStores) != len(tStores) {
return errors.New("number of stores in source and destination keepers do not match")
}

syncErrs := make([]error, 0, 2)
syncErrs = append(syncErrs, m.From.SyncAll(), m.To.SyncAll())
return errors.Join(syncErrs...)
}
102 changes: 102 additions & 0 deletions migrate/migrate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package migrate

import (
"errors"
"testing"

"git.tcp.direct/tcp.direct/database"
)

func TestMigrator_WithClobber(t *testing.T) {
from := database.NewMockKeeper("yeeeties")
to := database.NewMockKeeper("yooties")

migrator, err := NewMigrator(from, to)
if err != nil {
t.Fatalf("error creating migrator: %v", err)
}
migrator = migrator.WithClobber()

if !migrator.clobber {
t.Error("expected clobber to be true")
}
}

func TestMigrator_WithSkipExisting(t *testing.T) {
from := database.NewMockKeeper("yeeeties")
to := database.NewMockKeeper("yooties")

migrator, err := NewMigrator(from, to)
if err != nil {
t.Fatalf("error creating migrator: %v", err)
}
migrator = migrator.WithSkipExisting()

if !migrator.skipExisting {
t.Error("expected skipExisting to be true")
}
}

func TestMigrator_CheckDupes_NoStores(t *testing.T) {
from := database.NewMockKeeper("yeeeties")
to := database.NewMockKeeper("yooties")

migrator, err := NewMigrator(from, to)
if err != nil {
t.Fatalf("error creating migrator: %v", err)
}

err = migrator.CheckDupes()

if !errors.Is(err, ErrNoStores) {
t.Error("expected ErrNoStores error")
}
}

func TestMigrator_CheckDupes_DuplicateKeys(t *testing.T) {
from := database.NewMockKeeper("yeeeties")
to := database.NewMockKeeper("yooties")

if err := from.WithNew("store1").Put([]byte("key1"), []byte("value1")); err != nil {
t.Fatalf("error putting key1: %v", err)
}
if err := to.WithNew("store1").Put([]byte("key1"), []byte("value1")); err != nil {
t.Fatalf("error putting key1: %v", err)
}

migrator, err := NewMigrator(from, to)
if err != nil {
t.Fatalf("error creating migrator: %v", err)
}

err = migrator.CheckDupes()

if !errors.Is(err, ErrDupKeys) {
t.Error("expected ErrDuplicateKeys error")
}
}

func TestMigrator_Success(t *testing.T) {
from := database.NewMockKeeper("yeeeties")
to := database.NewMockKeeper("yooties")

if err := from.WithNew("store1").Put([]byte("key1"), []byte("value1")); err != nil {
t.Fatalf("error putting key1: %v", err)
}

migrator, err := NewMigrator(from, to)
if err != nil {
t.Fatalf("error creating migrator: %v", err)
}
migrator = migrator.WithClobber()

err = migrator.Migrate()

if err != nil {
t.Errorf("expected no error, got %v", err)
}

if !to.With("store1").Has([]byte("key1")) {
t.Error("expected key1 to be to destination keeper")
}
}
Loading

0 comments on commit c9da7d1

Please sign in to comment.